This is an automated email from the ASF dual-hosted git repository. janhoy pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/solr-orbit.git
commit e9b673e87f87db9e2238e2f445cf26c9575dafd6 Author: Jan Høydahl <[email protected]> AuthorDate: Fri May 22 00:55:18 2026 +0200 Remove OSB-specific dead code, binaries, and infrastructure Clean out everything that has no place in a Solr benchmark tool: **Kafka / async HTTP / gRPC:** - Remove kafka_client.py (Kafka producer for OpenSearch metrics streaming) - Remove async_connection.py (OpenSearch async HTTP connection layer) - Remove worker_coordinator/proto_helpers/ (gRPC bulk/query helpers) - Remove osbenchmark/data_streaming/ package (Kafka data pipeline) - Remove all corresponding unit tests **Bundled binaries:** - Remove osbenchmark/decompressors/pbzip2-{Darwin,Linux}-{arm64,x86_64,aarch64} - Remove scripts/pbzip2 These binaries are not redistributable in an ASF project; decompression will use the system pbzip2 or fallback to Python's bz2 module. **OpenSearch-specific infrastructure:** - Remove scripts/terraform/ (Terraform cluster provisioning for OpenSearch on AWS) - Remove samples/ccr/ (OpenSearch cross-cluster replication sample) - Remove tests for all of the above Part of #3 --- osbenchmark/async_connection.py | 307 ------------------ osbenchmark/data_streaming/__init__.py | 0 osbenchmark/data_streaming/data_producer.py | 15 - osbenchmark/decompressors/pbzip2-Darwin-arm64 | Bin 115448 -> 0 bytes osbenchmark/decompressors/pbzip2-Darwin-x86_64 | Bin 114624 -> 0 bytes osbenchmark/decompressors/pbzip2-Linux-aarch64 | Bin 225704 -> 0 bytes osbenchmark/decompressors/pbzip2-Linux-x86_64 | Bin 82620 -> 0 bytes osbenchmark/kafka_client.py | 70 ---- .../proto_helpers/ProtoBulkHelper.py | 71 ---- .../proto_helpers/ProtoQueryHelper.py | 206 ------------ .../worker_coordinator/proto_helpers/__init__.py | 23 -- samples/ccr/docker-compose-metricstore.yml | 44 --- samples/ccr/docker-compose.yml | 59 ---- samples/ccr/start.sh | 150 --------- samples/ccr/stop.sh | 5 - scripts/pbzip2 | 8 - scripts/terraform/.gitignore | 38 --- scripts/terraform/.terraform.lock.hcl | 103 ------ scripts/terraform/PROVISIONING_CLUSTER.md | 56 ---- .../terraform/modules/opensearch/jvm.v2.options | 93 ------ .../terraform/modules/opensearch/jvm.v3.options | 88 ----- scripts/terraform/modules/opensearch/opensearch.tf | 131 -------- .../terraform/modules/opensearch/os-cluster.yaml | 55 ---- scripts/terraform/modules/opensearch/os_cluster.sh | 130 -------- scripts/terraform/modules/opensearch/outputs.tf | 7 - scripts/terraform/modules/opensearch/variables.tf | 70 ---- scripts/terraform/outputs.tf | 17 - scripts/terraform/provision-cluster.tf | 194 ----------- scripts/terraform/resources.sh | 57 ---- scripts/terraform/terraform.tfvars.example | 2 - scripts/terraform/variables.tf | 21 -- .../core_plugin_source_downloader_test.py | 23 -- .../external_plugin_source_downloader_test.py | 54 ---- .../plugin_distribution_downloader_test.py | 28 -- .../installers/preparers/plugin_preparer_test.py | 41 --- tests/data_streaming/__init__.py | 7 - tests/data_streaming/producer_test.py | 172 ---------- tests/kafka_client_test.py | 133 -------- tests/test_async_connection.py | 62 ---- tests/worker_coordinator/proto_bulk_helper_test.py | 124 ------- .../worker_coordinator/proto_query_helper_test.py | 356 --------------------- 41 files changed, 3020 deletions(-) diff --git a/osbenchmark/async_connection.py b/osbenchmark/async_connection.py deleted file mode 100644 index 98ff32e7..00000000 --- a/osbenchmark/async_connection.py +++ /dev/null @@ -1,307 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# The OpenSearch Contributors require contributions made to -# this file be licensed under the Apache-2.0 license or a -# compatible open source license. -# Modifications Copyright OpenSearch Contributors. See -# GitHub history for details. - -import asyncio -import json -import logging -from typing import Optional, List - -import aiohttp -import opensearchpy -from aiohttp import RequestInfo, BaseConnector -from aiohttp.client_proto import ResponseHandler -from aiohttp.helpers import BaseTimerContext -from multidict import CIMultiDictProxy, CIMultiDict -from yarl import URL - -from osbenchmark.utils import io - - -class StaticTransport: - def __init__(self): - self.closed = False - - def is_closing(self): - return False - - def close(self): - self.closed = True - - -class StaticConnector(BaseConnector): - async def _create_connection(self, req: "ClientRequest", traces: List["Trace"], - timeout: "ClientTimeout") -> ResponseHandler: - handler = ResponseHandler(self._loop) - handler.transport = StaticTransport() - handler.protocol = "" - return handler - - -class StaticRequest(aiohttp.ClientRequest): - RESPONSES = None - - async def send(self, conn: "Connection") -> "ClientResponse": - self.response = self.response_class( - self.method, - self.original_url, - writer=self._writer, - continue100=self._continue, - timer=self._timer, - request_info=self.request_info, - traces=self._traces, - loop=self.loop, - session=self._session, - ) - path = self.original_url.path - self.response.static_body = StaticRequest.RESPONSES.response(path) - return self.response - - -class StaticResponse(aiohttp.ClientResponse): - def __init__(self, method: str, url: URL, *, writer: "asyncio.Task[None]", - continue100: Optional["asyncio.Future[bool]"], timer: BaseTimerContext, request_info: RequestInfo, - traces: List["Trace"], loop: asyncio.AbstractEventLoop, session: "ClientSession") -> None: - super().__init__(method, url, writer=writer, continue100=continue100, timer=timer, request_info=request_info, - traces=traces, loop=loop, session=session) - self.static_body = None - - async def start(self, connection: "Connection") -> "ClientResponse": - self._closed = False - self._protocol = connection.protocol - self._connection = connection - self._headers = CIMultiDictProxy(CIMultiDict()) - self.status = 200 - return self - - async def text(self, encoding=None, errors="strict"): - return self.static_body - - -class RawClientResponse(aiohttp.ClientResponse): - """ - Returns the body as bytes object (instead of a str) to avoid decoding overhead. - """ - async def text(self, encoding=None, errors="strict"): - """Read response payload and decode.""" - if self._body is None: - await self.read() - - return self._body - - -class ResponseMatcher: - def __init__(self, responses): - self.logger = logging.getLogger(__name__) - self.responses = [] - - for response in responses: - path = response["path"] - if path == "*": - matcher = ResponseMatcher.always() - elif path.startswith("*"): - matcher = ResponseMatcher.endswith(path[1:]) - elif path.endswith("*"): - matcher = ResponseMatcher.startswith(path[:-1]) - else: - matcher = ResponseMatcher.equals(path) - - body = response["body"] - body_encoding = response.get("body-encoding", "json") - if body_encoding == "raw": - body = json.dumps(body).encode("utf-8") - elif body_encoding == "json": - body = json.dumps(body) - else: - raise ValueError(f"Unknown body encoding [{body_encoding}] for path [{path}]") - - self.responses.append((path, matcher, body)) - - @staticmethod - def always(): - # pylint: disable=unused-variable - def f(p): - return True - return f - - @staticmethod - def startswith(path_pattern): - def f(p): - return p.startswith(path_pattern) - return f - - @staticmethod - def endswith(path_pattern): - def f(p): - return p.endswith(path_pattern) - return f - - @staticmethod - def equals(path_pattern): - def f(p): - return p == path_pattern - return f - - def response(self, path): - for path_pattern, matcher, body in self.responses: - if matcher(path): - self.logger.debug("Path pattern [%s] matches path [%s].", path_pattern, path) - return body - - -class AIOHttpConnection(opensearchpy.AIOHttpConnection): - def __init__(self, - host="localhost", - port=None, - http_auth=None, - use_ssl=False, - ssl_assert_fingerprint=None, - headers=None, - ssl_context=None, - http_compress=None, - cloud_id=None, - api_key=None, - opaque_id=None, - loop=None, - trace_config=None, - **kwargs,): - super().__init__(host=host, - port=port, - http_auth=http_auth, - use_ssl=use_ssl, - ssl_assert_fingerprint=ssl_assert_fingerprint, - # provided to the base class via `maxsize` to keep base class state consistent despite OSB - # calling the attribute differently. - maxsize=max(256, kwargs.get("max_connections", 0)), - headers=headers, - ssl_context=ssl_context, - http_compress=http_compress, - cloud_id=cloud_id, - api_key=api_key, - opaque_id=opaque_id, - loop=loop, - **kwargs,) - - self._trace_configs = [trace_config] if trace_config else None - self._enable_cleanup_closed = kwargs.get("enable_cleanup_closed", False) - - static_responses = kwargs.get("static_responses") - self.use_static_responses = static_responses is not None - - if self.use_static_responses: - # read static responses once and reuse them - if not StaticRequest.RESPONSES: - with open(io.normalize_path(static_responses)) as f: - StaticRequest.RESPONSES = ResponseMatcher(json.load(f)) - - self._request_class = StaticRequest - self._response_class = StaticResponse - else: - self._request_class = aiohttp.ClientRequest - self._response_class = RawClientResponse - - async def _create_aiohttp_session(self): - if self.loop is None: - self.loop = asyncio.get_running_loop() - - if self.use_static_responses: - connector = StaticConnector(limit=self._limit, enable_cleanup_closed=self._enable_cleanup_closed) - else: - connector = aiohttp.TCPConnector( - limit=self._limit, - use_dns_cache=True, - ssl_context=self._ssl_context, - enable_cleanup_closed=self._enable_cleanup_closed - ) - - self.session = aiohttp.ClientSession( - headers=self.headers, - auto_decompress=True, - loop=self.loop, - cookie_jar=aiohttp.DummyCookieJar(), - request_class=self._request_class, - response_class=self._response_class, - connector=connector, - trace_configs=self._trace_configs, - ) - - -class AsyncHttpConnection(opensearchpy.AsyncHttpConnection): - def __init__(self, - host="localhost", - port=None, - http_auth=None, - use_ssl=False, - ssl_assert_fingerprint=None, - headers=None, - ssl_context=None, - http_compress=None, - cloud_id=None, - api_key=None, - opaque_id=None, - loop=None, - trace_config=None, - **kwargs,): - super().__init__(host=host, - port=port, - http_auth=http_auth, - use_ssl=use_ssl, - ssl_assert_fingerprint=ssl_assert_fingerprint, - # provided to the base class via `maxsize` to keep base class state consistent despite OSB - # calling the attribute differently. - maxsize=max(256, kwargs.get("max_connections", 0)), - headers=headers, - ssl_context=ssl_context, - http_compress=http_compress, - cloud_id=cloud_id, - api_key=api_key, - opaque_id=opaque_id, - loop=loop, - **kwargs,) - - self._trace_configs = [trace_config] if trace_config else None - self._enable_cleanup_closed = kwargs.get("enable_cleanup_closed", False) - - static_responses = kwargs.get("static_responses") - self.use_static_responses = static_responses is not None - - if self.use_static_responses: - # read static responses once and reuse them - if not StaticRequest.RESPONSES: - with open(io.normalize_path(static_responses)) as f: - StaticRequest.RESPONSES = ResponseMatcher(json.load(f)) - - self._request_class = StaticRequest - self._response_class = StaticResponse - else: - self._request_class = aiohttp.ClientRequest - self._response_class = RawClientResponse - - async def _create_aiohttp_session(self): - if self.loop is None: - self.loop = asyncio.get_running_loop() - - if self.use_static_responses: - connector = StaticConnector(limit=self._limit, enable_cleanup_closed=self._enable_cleanup_closed) - else: - connector = aiohttp.TCPConnector( - limit=self._limit, - use_dns_cache=True, - ssl_context=self._ssl_context, - enable_cleanup_closed=self._enable_cleanup_closed - ) - - self.session = aiohttp.ClientSession( - headers=self.headers, - auto_decompress=True, - loop=self.loop, - cookie_jar=aiohttp.DummyCookieJar(), - request_class=self._request_class, - response_class=self._response_class, - connector=connector, - trace_configs=self._trace_configs, - ) diff --git a/osbenchmark/data_streaming/__init__.py b/osbenchmark/data_streaming/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/osbenchmark/data_streaming/data_producer.py b/osbenchmark/data_streaming/data_producer.py deleted file mode 100755 index 13675b2f..00000000 --- a/osbenchmark/data_streaming/data_producer.py +++ /dev/null @@ -1,15 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# The OpenSearch Contributors require contributions made to -# this file be licensed under the Apache-2.0 license or a -# compatible open source license. -# Modifications Copyright OpenSearch Contributors. See -# GitHub history for details. - -from abc import ABC, abstractmethod - -class DataProducer(ABC): - - @abstractmethod - def generate_chunked_data(self) -> None: - pass diff --git a/osbenchmark/decompressors/pbzip2-Darwin-arm64 b/osbenchmark/decompressors/pbzip2-Darwin-arm64 deleted file mode 100644 index e11b2308..00000000 Binary files a/osbenchmark/decompressors/pbzip2-Darwin-arm64 and /dev/null differ diff --git a/osbenchmark/decompressors/pbzip2-Darwin-x86_64 b/osbenchmark/decompressors/pbzip2-Darwin-x86_64 deleted file mode 100755 index 1d075887..00000000 Binary files a/osbenchmark/decompressors/pbzip2-Darwin-x86_64 and /dev/null differ diff --git a/osbenchmark/decompressors/pbzip2-Linux-aarch64 b/osbenchmark/decompressors/pbzip2-Linux-aarch64 deleted file mode 100755 index e299583d..00000000 Binary files a/osbenchmark/decompressors/pbzip2-Linux-aarch64 and /dev/null differ diff --git a/osbenchmark/decompressors/pbzip2-Linux-x86_64 b/osbenchmark/decompressors/pbzip2-Linux-x86_64 deleted file mode 100755 index 29ac9cbe..00000000 Binary files a/osbenchmark/decompressors/pbzip2-Linux-x86_64 and /dev/null differ diff --git a/osbenchmark/kafka_client.py b/osbenchmark/kafka_client.py deleted file mode 100644 index 2a228afb..00000000 --- a/osbenchmark/kafka_client.py +++ /dev/null @@ -1,70 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# The OpenSearch Contributors require contributions made to -# this file be licensed under the Apache-2.0 license or a -# compatible open source license. -# Modifications Copyright OpenSearch Contributors. See -# GitHub history for details. -# Licensed to Elasticsearch B.V. under one or more contributor -# license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright -# ownership. Elasticsearch B.V. licenses this file to you under -# the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from aiokafka import AIOKafkaProducer -from osbenchmark.context import RequestContextHolder - -class KafkaMessageProducer: - def __init__(self, producer, topic): - self._producer = producer - self._topic = topic - self._ctx_holder = RequestContextHolder() - - @classmethod - async def create(cls, params): - """ - Creates a Kafka producer based on parameters in the ingestion source. - """ - - ingestion_source = params.get("ingestion-source", {}) - kafka_params = ingestion_source.get("param", {}) - topic = kafka_params.get("topic") - if not topic: - raise ValueError("No 'topic' specified in ingestion source parameters.") - bootstrap_servers = kafka_params.get("bootstrap-servers", "") - - producer = AIOKafkaProducer( - bootstrap_servers=bootstrap_servers, - key_serializer=str.encode, - value_serializer=str.encode - ) - await producer.start() - return cls(producer, topic) - - async def send_message(self, message, key=""): - """ - Sends a message to the producer's topic. - """ - await self._producer.send_and_wait(self._topic, message, key=key) - - async def stop(self): - """ - Stops the underlying producer. - """ - await self._producer.stop() - - @property - def new_request_context(self): - # Delegate to the internal holder - return self._ctx_holder.new_request_context diff --git a/osbenchmark/worker_coordinator/proto_helpers/ProtoBulkHelper.py b/osbenchmark/worker_coordinator/proto_helpers/ProtoBulkHelper.py deleted file mode 100644 index 61756ce5..00000000 --- a/osbenchmark/worker_coordinator/proto_helpers/ProtoBulkHelper.py +++ /dev/null @@ -1,71 +0,0 @@ -from opensearch.protobufs.schemas import document_pb2 - -def _parse_docs_from_body(body): - index_op_lines = body.decode('utf-8').split('\n') - doc_list = [] - for doc in index_op_lines[1::2]: - doc_list.append(doc) - return doc_list - -class ProtoBulkHelper: - # Build protobuf SearchRequest. - # Consumed from params dictionary: - # * ``body``: JSON body of bulk ingest request - # * ``index``: index name - @staticmethod - def build_proto_request(params): - index = params.get("index") - body = params.get("body") - doc_list = _parse_docs_from_body(body) - request = document_pb2.BulkRequest() - request.index = index - # All bulk requests here are index ops - op_container = document_pb2.OperationContainer() - op_container.index.CopyFrom(document_pb2.IndexOperation()) - for doc in doc_list: - request_body = document_pb2.BulkRequestBody() - request_body.object = doc.encode('utf-8') - request_body.operation_container.CopyFrom(op_container) - request.request_body.append(request_body) - return request - - # Parse stats from protobuf response. - # Consumed from params dictionary: - # ``index``: index name - # ``bulk-size``: documents per bulk request - # ``unit``: in the case of bulk always 'ops' - # ``detailed-results``: gRPC/Protobuf does not support detailed results at this time. - @staticmethod - def build_stats(response : document_pb2.BulkResponse, params): - if params.get("detailed-results"): - raise Exception("Detailed results not supported for gRPC bulk requests") - - took = None - error_count = 0 - success_count = 0 - if response.errors: - error_count = params.get("bulk-size") - else: - took = response.took - for item in response.items: - # status field mirrors http code conventions - # https://github.com/opensearch-project/opensearch-protobufs/blob/b6f889416da83b7dc4a0408347965e7820bd61d0/protos/schemas/document.proto#L217-L219 - if item.index.status > 299: - error_count += 1 - else: - success_count += 1 - - meta_data = { - "index": params.get("index"), - "weight": params.get("bulk-size"), - "unit": params.get("unit"), - "took": took, - "success": error_count == 0, - "success-count": success_count, - "error-count": error_count, - } - - if error_count > 0: - meta_data["error-type"] = "bulk" - - return meta_data diff --git a/osbenchmark/worker_coordinator/proto_helpers/ProtoQueryHelper.py b/osbenchmark/worker_coordinator/proto_helpers/ProtoQueryHelper.py deleted file mode 100644 index d4394873..00000000 --- a/osbenchmark/worker_coordinator/proto_helpers/ProtoQueryHelper.py +++ /dev/null @@ -1,206 +0,0 @@ -from opensearch.protobufs.schemas import search_pb2 -from opensearch.protobufs.schemas import common_pb2 - -# In some cases (KNN) we set stored fields explicitly to "_none_" to disable -# https://github.com/opensearch-project/OpenSearch/blob/3.3/server/src/main/java/org/opensearch/search/fetch/StoredFieldsContext.java#L59 -STORED_FIELDS_NONE = "_none_" - -def is_true(value): - if isinstance(value, str): - return value.lower() == "true" - return bool(value) - -def get_relation(relation): - match relation: - case 0: - return "TOTAL_HITS_RELATION_UNSPECIFIED" - case 1: - return "TOTAL_HITS_RELATION_EQ" - case 2: - return "TOTAL_HITS_RELATION_GTE" - case _: - return "TOTAL_HITS_RELATION_UNSET" - -def get_terms_dict(query): - terms = {} - for key, value in query.items(): - terms[key] = [] - if isinstance(value, list): - terms[key].extend(value) - elif isinstance(value, dict): - for _, term_value in value.items(): - terms[key].append(term_value) - else: - raise Exception("Error parsing query - Term(s) are neither list nor dictionary: " + str(query)) - return terms - -class ProtoQueryHelper: - # Parse term query into common_pb2.TermQuery protobuf. - # Term query supports a single term on single field. - @staticmethod - def term_query_to_proto(query): - term = get_terms_dict(query) - if len(term.keys()) > 1: - raise Exception("Error parsing query - Term query contains multiple distinct fields: " + str(query)) - if len(term.values()) > 1: - raise Exception("Error parsing query - Term query contains multiple terms: " + str(query)) - - # Term query body gives field/value as lists - term_field = next(iter(term.keys())) - term_value = next(iter(term[term_field])) - - if not isinstance(term_value, str): - raise Exception(f"Error parsing query - Term query field value is not a supported type: {term_value} (type: {type(term_value).__name__})") - - f_val = common_pb2.FieldValue(string=term_value) - return common_pb2.TermQuery( - field=term_field, - value=f_val - ) - - # Parse a query body into the corresponding protobuf type. - # Exceptions are thrown for unsupported query types. - # (Note that gRPC/protobuf API coverage is not comprehensive) - @staticmethod - def query_body_to_proto(body): - query_body = body.get("query") - for key, _ in query_body.items(): - if key == "match_all": - return common_pb2.QueryContainer( - match_all=common_pb2.MatchAllQuery() - ) - if key == "term": - return common_pb2.QueryContainer( - term=ProtoQueryHelper.term_query_to_proto(query_body.get("term")) - ) - raise Exception("Unsupported query type: " + str(query_body)) - - # Build protobuf SearchRequest. - # Consumed from params dictionary: - # ``body``: query body as loaded from workload - Contains `_size` and `source` - # ``index``: index name - # ``request-timeout``: request timeout - # ``cache``: enabled request cache - @staticmethod - def build_proto_request(params): - body = params.get("body") - size = body.get("size") if "size" in body else None - fetch_source = is_true(body.get("_source")) - source_config = common_pb2.SourceConfigParam(bool=fetch_source) - index = [params.get("index")] - timeout = None if params.get("request-timeout") is None else str(params.get("request-timeout")) + "ms" - - if isinstance(params.get("cache"), bool): - cache = params.get("cache") - elif isinstance(params.get("cache"), str): - cache = params.get("cache").lower() == "true" - else: - cache = None - - return search_pb2.SearchRequest( - request_body=search_pb2.SearchRequestBody( - query=ProtoQueryHelper.query_body_to_proto(body), - timeout=timeout, - size = size - ), - index=index, - x_source=source_config, - request_cache=cache - ) - - # Build protobuf SearchRequest for vector search workload. - # Vector search requests have a slightly different structure and provide additional params - # outside the query body. - # ``body``: knn query body - # ``index``: index name - # ``request-timeout``: request timeout - # ``cache``: enabled request cache - # ``request-params``: vector search lists _source here - @staticmethod - def build_vector_search_proto_request(params): - if is_true(params.get("detailed-results")): - raise NotImplementedError("Detailed results not supported for gRPC/protobuf vector search") - if is_true(params.get("calculate-recall")) or params.get("id-field-name"): - raise NotImplementedError("Recall calculations not supported for gRPC/protobuf vector search") - if is_true(params.get("response-compression-enabled")): - raise NotImplementedError("Compression not supported for gRPC/protobuf transport") - if params.get("type"): - raise NotImplementedError("Doc type not supported for knn query type") - if params.get("filter_body") or params.get("filter_type"): - raise NotImplementedError("Filter options not supported for gRPC/protobuf vector search") - - index = [params.get("index")] - body = params.get("body") - doc_value_fields = body.get("docvalue_fields") - size = body.get("size") if "size" in body else None - request_params = params.get("request-params") - fetch_source = is_true(request_params.get("_source")) - profile_query = is_true(params.get("profile_query")) - partial_results = is_true(request_params.get("allow_partial_search_results")) - source_config = common_pb2.SourceConfigParam(bool=fetch_source) - timeout = params.get("request-timeout") - - stored_fields = body.get("stored_fields") - if stored_fields is None or stored_fields == STORED_FIELDS_NONE: - stored_fields = [STORED_FIELDS_NONE] - elif not isinstance(stored_fields, list): - raise Exception("Error parsing query params - Stored fields must be a list") - - if isinstance(params.get("cache"), bool): - cache = params.get("cache") - elif isinstance(params.get("cache"), str): - cache = params.get("cache").lower() == "true" - else: - cache = None - - # Parse knn query into `common_pb2.KnnQuery` protobuf. - def knn_query_to_proto(query) -> common_pb2.KnnQuery: - knn_query = query.get("knn") - target_field_key = next(iter(knn_query.keys())) - vector = knn_query[target_field_key].get("vector") - k = knn_query[target_field_key].get("k") - return common_pb2.KnnQuery( - field=target_field_key, - vector=vector, - k=k - ) - - knn_query_proto = knn_query_to_proto(body.get("query")) - return search_pb2.SearchRequest( - request_body=search_pb2.SearchRequestBody( - query=common_pb2.QueryContainer(knn=knn_query_proto), - timeout=timeout, - profile=profile_query, - size = size - ), - index=index, - x_source=source_config, - request_cache=cache, - allow_partial_search_results=partial_results, - docvalue_fields=doc_value_fields, - stored_fields=stored_fields - ) - - # Parse stats from protobuf response. - # ``detailed-results``: return detailed results, hits, took, hits_relation - @staticmethod - def build_stats(response, params): - if not isinstance(response, search_pb2.SearchResponse): - raise Exception("Unknown response proto: " + response) - - if params.get("detailed-results"): - return { - "weight": 1, - "unit": "ops", - "success": True, - "hits": response.hits.total.total_hits.value, - "hits_relation": get_relation(response.hits.total.total_hits.relation), - "timed_out": response.timed_out, - "took": response.took, - } - - return { - "weight": 1, - "unit": "ops", - "success": True - } diff --git a/osbenchmark/worker_coordinator/proto_helpers/__init__.py b/osbenchmark/worker_coordinator/proto_helpers/__init__.py deleted file mode 100644 index e7a26e8e..00000000 --- a/osbenchmark/worker_coordinator/proto_helpers/__init__.py +++ /dev/null @@ -1,23 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# The OpenSearch Contributors require contributions made to -# this file be licensed under the Apache-2.0 license or a -# compatible open source license. -# Modifications Copyright OpenSearch Contributors. See -# GitHub history for details. -# Licensed to Elasticsearch B.V. under one or more contributor -# license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright -# ownership. Elasticsearch B.V. licenses this file to you under -# the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/samples/ccr/docker-compose-metricstore.yml b/samples/ccr/docker-compose-metricstore.yml deleted file mode 100644 index fabe2e9f..00000000 --- a/samples/ccr/docker-compose-metricstore.yml +++ /dev/null @@ -1,44 +0,0 @@ -# Creates an Opensearch cluster to publish the OpenSearch Benchmark metrics. -version: '3' -services: - metricstore-node: - image: opensearchproject/opensearch:latest - container_name: metricstore-node - environment: - - cluster.name=opensearch-metricstore-cluster - - node.name=metricstore-node - - discovery.seed_hosts=metricstore-node - - cluster.initial_master_nodes=metricstore-node - - bootstrap.memory_lock=true # along with the memlock settings below, disables swapping - - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" # minimum and maximum Java heap size, recommend setting both to 50% of system RAM - ulimits: - memlock: - soft: -1 - hard: -1 - nofile: - soft: 65536 # maximum number of open files for the OpenSearch user, set to at least 65536 on modern systems - hard: 65536 - volumes: - - metricstore-volume:/usr/share/opensearch/data - - ports: - - 9209:9200 - networks: - - opensearch-net-metrics - opensearch-dashboards: - image: opensearchproject/opensearch-dashboards:latest - container_name: opensearch-dashboards - ports: - - 5601:5601 - expose: - - "5601" - environment: - OPENSEARCH_HOSTS: '["https://metricstore-node:9200"]' - networks: - - opensearch-net-metrics - -volumes: - metricstore-volume: - -networks: - opensearch-net-metrics: \ No newline at end of file diff --git a/samples/ccr/docker-compose.yml b/samples/ccr/docker-compose.yml deleted file mode 100644 index ab241b74..00000000 --- a/samples/ccr/docker-compose.yml +++ /dev/null @@ -1,59 +0,0 @@ -# Creates 2 single node Opensearch cluster(one leader and one follower). Leader cluster is used for ingesting the data which is then -# replicated to the follower cluster. -version: '3' -services: - leader-cluster: - image: opensearchproject/opensearch:latest - container_name: leader-cluster - environment: - - cluster.name=leader-cluster - - node.name=leader-node - - discovery.seed_hosts=leader-node - - cluster.initial_master_nodes=leader-node - - bootstrap.memory_lock=true # along with the memlock settings below, disables swapping - - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" # minimum and maximum Java heap size, recommend setting both to 50% of system RAM - ulimits: - memlock: - soft: -1 - hard: -1 - nofile: - soft: 65536 # maximum number of open files for the OpenSearch user, set to at least 65536 on modern systems - hard: 65536 - volumes: - - leader-node-volume:/usr/share/opensearch/data - ports: - - 9200:9200 - - 9300:9300 # required for Performance Analyzer - networks: - - opensearch-net - follower-cluster: - image: opensearchproject/opensearch:latest - container_name: follower-cluster - environment: - - cluster.name=follower-cluster - - node.name=follower-node - - discovery.seed_hosts=follower-node - - cluster.initial_master_nodes=follower-node - - bootstrap.memory_lock=true # along with the memlock settings below, disables swapping - - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" # minimum and maximum Java heap size, recommend setting both to 50% of system RAM - ulimits: - memlock: - soft: -1 - hard: -1 - nofile: - soft: 65536 # maximum number of open files for the OpenSearch user, set to at least 65536 on modern systems - hard: 65536 - volumes: - - follower-node-volume:/usr/share/opensearch/data - ports: - - 9201:9200 - - 9301:9300 # required for Performance Analyzer - networks: - - opensearch-net - -volumes: - leader-node-volume: - follower-node-volume: - -networks: - opensearch-net: \ No newline at end of file diff --git a/samples/ccr/start.sh b/samples/ccr/start.sh deleted file mode 100755 index 7417fb87..00000000 --- a/samples/ccr/start.sh +++ /dev/null @@ -1,150 +0,0 @@ -#!/usr/bin/env bash - -# This script can be used for creating a cross cluster replication setup between 2 domains and executing a benchmark run. -# Simply execute the script using ./start.sh under the python virtual environment(Refer DEVELOPER_GUIDE.md) -# -# Prerequisite: Docker installed locally. -# Steps: -# 1. Sets up 2 single node (leader and follower) clusters. -# 2. Starts a single node cluster for metrics store. We can use Kibana attached to the metric store cluster to see the metrics.. -# 3. Configures the seed nodes on the follower cluster and starts replication using autofollow pattern. -# 4. Runs the eventdata benchmark on the replication setup. OSB metrics can be seen on the Kiabana. -# 5. To tear down everything, execute ./stop.sh. -set -e - - -# Start Opensearch -docker-compose up -d --remove-orphans - -# Start metrics store -docker-compose -f ./docker-compose-metricstore.yml up -d - -printf "Waiting for clusters to get ready " - -# Wait until OS is up -ALL_CLUSTERS_READY=false - -while ! $ALL_CLUSTERS_READY; do - (curl -ks -u admin:admin https://localhost:9200 -o /dev/null && curl -ks -u admin:admin https://localhost:9201 -o /dev/null && ALL_CLUSTERS_READY=true) || (printf "." && sleep 5) -done - -echo - -# Configure the seed nodes on follower cluster -# TODO: Update the seed node to private IP. -echo "Configure remotes on follower" -curl -o /dev/null -H 'Content-Type: application/json' -k -u admin:admin -X PUT https://localhost:9201/_cluster/settings -d @- <<-EOF - { - "persistent" : { - "cluster" : { - "remote" : { - "source" : { - "seeds" : [ - "127.0.0.1:9300" - ] - } - } - } - } - } -EOF - -echo "Set auto-follow pattern on follower for every index on leader" -curl -H 'Content-Type: application/json' -k -u admin:admin https://localhost:9201/_plugins/_replication/_autofollow -d @- <<-EOF -{ - "leader_alias": "source", - "name": "all", - "pattern": "eventdata*", - "use_roles": { - "leader_cluster_role": "all_access", - "follower_cluster_role": "all_access" - } -} -EOF - - -# Create target-hosts file for OSB. -cat >ccr-target-hosts.json <<'EOF' -{ - "default": [ - "https://127.0.0.1:9200" - ], - "follower": [ - "https://127.0.0.1:9201" - ] -} -EOF - -cat >ccr-telemetry-param.json <<'EOF' -{ - "ccr-stats-sample-interval": 1, - "ccr-stats-indices": { - "follower": ["eventdata"] - }, - "ccr-max-replication-lag-seconds": 36000 -} -EOF - -cat >ccr-client-options.json <<'EOF' -{ - "default": { - "use_ssl":"true", - "basic_auth_user":"admin", - "basic_auth_password":"admin", - "verify_certs":"false" - }, - "follower": { - "use_ssl":"true", - "basic_auth_user":"admin", - "basic_auth_password":"admin", - "verify_certs":"false" - } -} -EOF - - -# Create metricstore ini file -cat >${HOME}/.benchmark/benchmark.ini <<EOF -[meta] -config.version = 17 - -[system] -env.name = local - -[node] -root.dir = ${HOME}/.benchmark/benchmarks -src.root.dir = ${HOME}/.benchmark/benchmarks/src - -[source] -remote.repo.url = https://github.com/opensearch-project/OpenSearch.git -opensearch.src.subdir = opensearch - -[benchmarks] -local.dataset.cache = ${HOME}/.benchmark/benchmarks/data - -[reporting] -datastore.type = opensearch -datastore.host = 127.0.0.1 -datastore.port = 9209 -datastore.secure = True -datastore.user = admin -datastore.password = admin - - -[workloads] -default.url = https://github.com/opensearch-project/opensearch-benchmark-workloads - -[cluster_configs] -default.dir = default-cluster-config - -[defaults] -preserve_benchmark_candidate = false - -[distributions] -release.cache = true - -EOF - - -# Start OpenSearch Benchmark -opensearch-benchmark run --configuration-name=metricstore --workload=geonames --target-hosts=./ccr-target-hosts.json --pipeline=benchmark-only --workload-params="number_of_replicas:1" --client-options=./ccr-client-options.json --kill-running-processes --telemetry="ccr-stats" --telemetry-params=./ccr-telemetry-param.json \ No newline at end of file diff --git a/samples/ccr/stop.sh b/samples/ccr/stop.sh deleted file mode 100755 index 78021e08..00000000 --- a/samples/ccr/stop.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/usr/bin/env bash - -# Execute this script `./stop.sh` to stop the docker containers for leader cluster, follower cluster and metricstore. -docker-compose down -v -docker-compose -f docker-compose-metricstore.yml down -v \ No newline at end of file diff --git a/scripts/pbzip2 b/scripts/pbzip2 deleted file mode 100755 index f43fd65d..00000000 --- a/scripts/pbzip2 +++ /dev/null @@ -1,8 +0,0 @@ -#!/usr/bin/env sh - -dir=`python3 -c 'import osbenchmark; print(osbenchmark.__path__[0])'` -os=`uname` -arch=`uname -m` - -exec $dir/decompressors/pbzip2-$os-$arch "$@" - diff --git a/scripts/terraform/.gitignore b/scripts/terraform/.gitignore deleted file mode 100644 index 719891de..00000000 --- a/scripts/terraform/.gitignore +++ /dev/null @@ -1,38 +0,0 @@ -# Official Github .gitignore for Terraform: https://github.com/github/gitignore/blob/5445a270254a0b4443b20ed0033c4094959f937e/Terraform.gitignore -# Local .terraform directories -**/.terraform/* - -# .tfstate files -*.tfstate -*.tfstate.* - -# Crash log files -crash.log -crash.*.log - -# Exclude all .tfvars files, which are likely to contain sensitive data, such as -# password, private keys, and other secrets. These should not be part of version -# control as they are data points which are potentially sensitive and subject -# to change depending on the environment. -*.tfvars -*.tfvars.json - -# Ignore override files as they are usually used to override resources locally and so -# are not checked in -override.tf -override.tf.json -*_override.tf -*_override.tf.json - -# Ignore transient lock info files created by terraform apply -.terraform.tfstate.lock.info - -# Include override files you do wish to add to version control using negated pattern -# !example_override.tf - -# Include tfplan files to ignore the plan output of command: terraform plan -out=tfplan -# example: *tfplan* - -# Ignore CLI configuration files -.terraformrc -terraform.rc \ No newline at end of file diff --git a/scripts/terraform/.terraform.lock.hcl b/scripts/terraform/.terraform.lock.hcl deleted file mode 100644 index b43865af..00000000 --- a/scripts/terraform/.terraform.lock.hcl +++ /dev/null @@ -1,103 +0,0 @@ -# This file is maintained automatically by "terraform init". -# Manual edits may be lost in future updates. - -provider "registry.terraform.io/hashicorp/aws" { - version = "5.65.0" - constraints = "5.65.0" - hashes = [ - "h1:OG8xMZjGZL/OtEV9OwX0CTPcUzvSfcfiB0X9lcs2joY=", - "zh:036f8557c8c9b58656e1ec08ed5702e44bd338fda17dc4b2add40b234102e29a", - "zh:0ba0708ece98735540070899a916b7a90c5c887be31ffd693ee1359e40245978", - "zh:12d82a82ae0e3bc580f2be961078e89d129e12df7dd82a6ec610a2b945bba1a4", - "zh:1ed0ee17df8807aef64976e2a4276d2a3e1d54efeae2a86f596d12eccb94dc83", - "zh:36b7c61a83d24f612156b4648027ba8bd5727f0ed57183cbad0e6c93b7503aa2", - "zh:496d06a089b1bc8d60995e8dddfe1d87c605a208f377a60b17987e89381dafda", - "zh:4e9aba435994589befe4279927c71a461a52e6cd96b8f0437295c18c50f6baff", - "zh:71134031288a312db1804d4798b10f106a843c36aafd7b8fe8f4859156d7df93", - "zh:748d0dbdfbe8df4b516a09b23b3981c19cef9a255c1ca0187e84ab424e6bd845", - "zh:783541ff77f4e7c74c817e0e2989ebdb45dd6e2c9853a8cccbcf5f1976736a76", - "zh:9b12af85486a96aedd8d7984b0ff811a4b42e3d88dad1a3fb4c0b580d04fa425", - "zh:af3f080975d5ed79917b8238cc0ae3150da688bc89e12dcc3ee85134b29857d0", - "zh:ec542372c3ffbfc3df6966f77357f8af7319d4bd956ff8e9fde0bbd124352e34", - "zh:f3dc7b2b5b55173207c2fd35ed6bb8cc66b06af777e221060ca2f0c0afdecbb5", - "zh:f9631ecc21d6e5cf82ef6ef8d14c39e1dfb2a52cc8f0abb684311885ffdb79a1", - ] -} - -provider "registry.terraform.io/hashicorp/external" { - version = "2.3.4" - constraints = "2.3.4" - hashes = [ - "h1:cCabxnWQ5fX1lS7ZqgUzsvWmKZw9FA7NRxAZ94vcTcc=", - "zh:037fd82cd86227359bc010672cd174235e2d337601d4686f526d0f53c87447cb", - "zh:0ea1db63d6173d01f2fa8eb8989f0809a55135a0d8d424b08ba5dabad73095fa", - "zh:17a4d0a306566f2e45778fbac48744b6fd9c958aaa359e79f144c6358cb93af0", - "zh:298e5408ab17fd2e90d2cd6d406c6d02344fe610de5b7dae943a58b958e76691", - "zh:38ecfd29ee0785fd93164812dcbe0664ebbe5417473f3b2658087ca5a0286ecb", - "zh:59f6a6f31acf66f4ea3667a555a70eba5d406c6e6d93c2c641b81d63261eeace", - "zh:78d5eefdd9e494defcb3c68d282b8f96630502cac21d1ea161f53cfe9bb483b3", - "zh:ad0279dfd09d713db0c18469f585e58d04748ca72d9ada83883492e0dd13bd58", - "zh:c69f66fd21f5e2c8ecf7ca68d9091c40f19ad913aef21e3ce23836e91b8cbb5f", - "zh:d4a56f8c48aa86fc8e0c233d56850f5783f322d6336f3bf1916e293246b6b5d4", - "zh:f2b394ebd4af33f343835517e80fc876f79361f4688220833bc3c77655dd2202", - "zh:f31982f29f12834e5d21e010856eddd19d59cd8f449adf470655bfd19354377e", - ] -} - -provider "registry.terraform.io/hashicorp/local" { - version = "2.5.3" - hashes = [ - "h1:MCzg+hs1/ZQ32u56VzJMWP9ONRQPAAqAjuHuzbyshvI=", - "zh:284d4b5b572eacd456e605e94372f740f6de27b71b4e1fd49b63745d8ecd4927", - "zh:40d9dfc9c549e406b5aab73c023aa485633c1b6b730c933d7bcc2fa67fd1ae6e", - "zh:6243509bb208656eb9dc17d3c525c89acdd27f08def427a0dce22d5db90a4c8b", - "zh:78d5eefdd9e494defcb3c68d282b8f96630502cac21d1ea161f53cfe9bb483b3", - "zh:885d85869f927853b6fe330e235cd03c337ac3b933b0d9ae827ec32fa1fdcdbf", - "zh:bab66af51039bdfcccf85b25fe562cbba2f54f6b3812202f4873ade834ec201d", - "zh:c505ff1bf9442a889ac7dca3ac05a8ee6f852e0118dd9a61796a2f6ff4837f09", - "zh:d36c0b5770841ddb6eaf0499ba3de48e5d4fc99f4829b6ab66b0fab59b1aaf4f", - "zh:ddb6a407c7f3ec63efb4dad5f948b54f7f4434ee1a2607a49680d494b1776fe1", - "zh:e0dafdd4500bec23d3ff221e3a9b60621c5273e5df867bc59ef6b7e41f5c91f6", - "zh:ece8742fd2882a8fc9d6efd20e2590010d43db386b920b2a9c220cfecc18de47", - "zh:f4c6b3eb8f39105004cf720e202f04f57e3578441cfb76ca27611139bc116a82", - ] -} - -provider "registry.terraform.io/hashicorp/random" { - version = "3.6.2" - constraints = "3.6.2" - hashes = [ - "h1:VavG5unYCa3SYISMKF9pzc3718M0bhPlcbUZZGl7wuo=", - "zh:0ef01a4f81147b32c1bea3429974d4d104bbc4be2ba3cfa667031a8183ef88ec", - "zh:1bcd2d8161e89e39886119965ef0f37fcce2da9c1aca34263dd3002ba05fcb53", - "zh:37c75d15e9514556a5f4ed02e1548aaa95c0ecd6ff9af1119ac905144c70c114", - "zh:4210550a767226976bc7e57d988b9ce48f4411fa8a60cd74a6b246baf7589dad", - "zh:562007382520cd4baa7320f35e1370ffe84e46ed4e2071fdc7e4b1a9b1f8ae9b", - "zh:5efb9da90f665e43f22c2e13e0ce48e86cae2d960aaf1abf721b497f32025916", - "zh:6f71257a6b1218d02a573fc9bff0657410404fb2ef23bc66ae8cd968f98d5ff6", - "zh:78d5eefdd9e494defcb3c68d282b8f96630502cac21d1ea161f53cfe9bb483b3", - "zh:9647e18f221380a85f2f0ab387c68fdafd58af6193a932417299cdcae4710150", - "zh:bb6297ce412c3c2fa9fec726114e5e0508dd2638cad6a0cb433194930c97a544", - "zh:f83e925ed73ff8a5ef6e3608ad9225baa5376446349572c2449c0c0b3cf184b7", - "zh:fbef0781cb64de76b1df1ca11078aecba7800d82fd4a956302734999cfd9a4af", - ] -} - -provider "registry.terraform.io/hashicorp/tls" { - version = "4.1.0" - hashes = [ - "h1:zEv9tY1KR5vaLSyp2lkrucNJ+Vq3c+sTFK9GyQGLtFs=", - "zh:14c35d89307988c835a7f8e26f1b83ce771e5f9b41e407f86a644c0152089ac2", - "zh:2fb9fe7a8b5afdbd3e903acb6776ef1be3f2e587fb236a8c60f11a9fa165faa8", - "zh:35808142ef850c0c60dd93dc06b95c747720ed2c40c89031781165f0c2baa2fc", - "zh:35b5dc95bc75f0b3b9c5ce54d4d7600c1ebc96fbb8dfca174536e8bf103c8cdc", - "zh:38aa27c6a6c98f1712aa5cc30011884dc4b128b4073a4a27883374bfa3ec9fac", - "zh:51fb247e3a2e88f0047cb97bb9df7c228254a3b3021c5534e4563b4007e6f882", - "zh:62b981ce491e38d892ba6364d1d0cdaadcee37cc218590e07b310b1dfa34be2d", - "zh:bc8e47efc611924a79f947ce072a9ad698f311d4a60d0b4dfff6758c912b7298", - "zh:c149508bd131765d1bc085c75a870abb314ff5a6d7f5ac1035a8892d686b6297", - "zh:d38d40783503d278b63858978d40e07ac48123a2925e1a6b47e62179c046f87a", - "zh:f569b65999264a9416862bca5cd2a6177d94ccb0424f3a4ef424428912b9cb3c", - "zh:fb07f708e3316615f6d218cec198504984c0ce7000b9f1eebff7516e384f4b54", - ] -} diff --git a/scripts/terraform/PROVISIONING_CLUSTER.md b/scripts/terraform/PROVISIONING_CLUSTER.md deleted file mode 100644 index 4dd5e34c..00000000 --- a/scripts/terraform/PROVISIONING_CLUSTER.md +++ /dev/null @@ -1,56 +0,0 @@ -# Provisioning Cluster - -## Setup -### Environment Setup -- Install `terraform`. [Installation Guide](https://developer.hashicorp.com/terraform/tutorials/aws-get-started/install-cli) -- Install AWS CLI -- In the AWS Console, go to "Security Credentials" and create a new "Access Key" -- Set the environment variables `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` -- Copy `terraform.tfvars.example` to `terraform.tfvars`. -- Configure `terraform.tfvars`. Common variables to configure are: - - `aws_region`, `aws_subnet_zone`: Specify where AWS infrastructure is deployed -- `terraform init` -- `terraform workspace new <unique-name>` (e.g. `terraform workspace new rschirone`) - -## Usage -First run `terraform plan` to confirm that the appropriate resources will be created - -Run `terraform apply` to deploy infrastructure. - -To specify alternative workloads/parameters, you can run: - - `terraform apply -var="workload=pmc" -var="workload_params=$(cat workload_params_default/pmc.json)"` - - or`terraform apply -var-file=my-terraform.tfvars` if you have a different `tfvars` file. - -The Terraform script is going to create several AWS EC2 instances. A `target-cluster` instance is used to host the product being benchmarked (e.g. OpenSearch). There may be additional cluster instances if the workload uses a multi-node deployment. - -Use `terraform output` to get the IPs/hostnames of the instances. - -Use `terraform output cluster-password` to get the password for the cluster. - -## Destroy Instances - -```shell -terraform destroy -``` - -## Identify AWS Resources in Use - -```shell -./scripts/resources.sh -``` - -## Resources that should be created - -### `provision-cluster.tf` Resources - -* `aws_key_pair`: Key pair for providing access to the ec2 instance -* `aws_vpc`: The virtual private cloud -* `aws_subnet`: The range ips for the VPC -* `aws_internet_gateway`: Provides path for traffic to VPC -* `aws_security_group`: Configures rules for VPC traffic. Also make sure that the ingress and egress rules are configured -* `aws_route_table`: Map for directing the network traffic -* `aws_route_table_association`: Defines how the route table directs its outgoing traffic - -### `opensearch.tf` Module Resources - -* One aws ec2 instances should be created for a single-node cluster. diff --git a/scripts/terraform/modules/opensearch/jvm.v2.options b/scripts/terraform/modules/opensearch/jvm.v2.options deleted file mode 100644 index 51cbc3e6..00000000 --- a/scripts/terraform/modules/opensearch/jvm.v2.options +++ /dev/null @@ -1,93 +0,0 @@ -## JVM configuration - -################################################################ -## IMPORTANT: JVM heap size -################################################################ -## -## You should always set the min and max JVM heap -## size to the same value. For example, to set -## the heap to 4 GB, set: -## -## -Xms4g -## -Xmx4g -## -## See https://opensearch.org/docs/opensearch/install/important-settings/ -## for more information -## -################################################################ - -# Xms represents the initial size of total heap space -# Xmx represents the maximum size of total heap space - --Xms1g --Xmx1g - -################################################################ -## Expert settings -################################################################ -## -## All settings below this section are considered -## expert settings. Don't tamper with them unless -## you understand what you are doing -## -################################################################ - -## GC configuration -8-10:-XX:+UseConcMarkSweepGC -8-10:-XX:CMSInitiatingOccupancyFraction=75 -8-10:-XX:+UseCMSInitiatingOccupancyOnly - -## G1GC Configuration -# NOTE: G1GC is the default GC for all JDKs 11 and newer -11-:-XX:+UseG1GC -# See https://github.com/elastic/elasticsearch/pull/46169 for the history -# behind these settings, but the tl;dr is that default values can lead -# to situations where heap usage grows enough to trigger a circuit breaker -# before GC kicks in. -11-:-XX:G1ReservePercent=25 -11-:-XX:InitiatingHeapOccupancyPercent=30 - -## JVM temporary directory --Djava.io.tmpdir=${OPENSEARCH_TMPDIR} - -## heap dumps - -# generate a heap dump when an allocation from the Java heap fails -# heap dumps are created in the working directory of the JVM --XX:+HeapDumpOnOutOfMemoryError - -# specify an alternative path for heap dumps; ensure the directory exists and -# has sufficient space --XX:HeapDumpPath=data - -# specify an alternative path for JVM fatal error logs --XX:ErrorFile=logs/hs_err_pid%p.log - -## JDK 8 GC logging -8:-XX:+PrintGCDetails -8:-XX:+PrintGCDateStamps -8:-XX:+PrintTenuringDistribution -8:-XX:+PrintGCApplicationStoppedTime -8:-Xloggc:logs/gc.log -8:-XX:+UseGCLogFileRotation -8:-XX:NumberOfGCLogFiles=32 -8:-XX:GCLogFileSize=64m - -# JDK 9+ GC logging -9-:-Xlog:gc*,gc+age=trace,safepoint:file=logs/gc.log:utctime,pid,tags:filecount=32,filesize=64m - -# Explicitly allow security manager (https://bugs.openjdk.java.net/browse/JDK-8270380) -18-:-Djava.security.manager=allow - -# JDK 20+ Incubating Vector Module for SIMD optimizations; -# disabling may reduce performance on vector optimized lucene -20-:--add-modules=jdk.incubator.vector - -# HDFS ForkJoinPool.common() support by SecurityManager --Djava.util.concurrent.ForkJoinPool.common.threadFactory=org.opensearch.secure_sm.SecuredForkJoinWorkerThreadFactory - -## OpenSearch Performance Analyzer --Dclk.tck=100 --Djdk.attach.allowAttachSelf=true --Djava.security.policy=/mnt/opensearch/opensearch-2.19.1/config/opensearch-performance-analyzer/opensearch_security.policy ---add-opens=jdk.attach/sun.tools.attach=ALL-UNNAMED \ No newline at end of file diff --git a/scripts/terraform/modules/opensearch/jvm.v3.options b/scripts/terraform/modules/opensearch/jvm.v3.options deleted file mode 100644 index f74751d3..00000000 --- a/scripts/terraform/modules/opensearch/jvm.v3.options +++ /dev/null @@ -1,88 +0,0 @@ -## JVM configuration - -################################################################ -## IMPORTANT: JVM heap size -################################################################ -## -## You should always set the min and max JVM heap -## size to the same value. For example, to set -## the heap to 4 GB, set: -## -## -Xms4g -## -Xmx4g -## -## See https://opensearch.org/docs/opensearch/install/important-settings/ -## for more information -## -################################################################ - -# Xms represents the initial size of total heap space -# Xmx represents the maximum size of total heap space - --Xms1g --Xmx1g - -################################################################ -## Expert settings -################################################################ -## -## All settings below this section are considered -## expert settings. Don't tamper with them unless -## you understand what you are doing -## -################################################################ - -## GC configuration -8-10:-XX:+UseConcMarkSweepGC -8-10:-XX:CMSInitiatingOccupancyFraction=75 -8-10:-XX:+UseCMSInitiatingOccupancyOnly - -## G1GC Configuration -# NOTE: G1GC is the default GC for all JDKs 11 and newer -11-:-XX:+UseG1GC -# See https://github.com/elastic/elasticsearch/pull/46169 for the history -# behind these settings, but the tl;dr is that default values can lead -# to situations where heap usage grows enough to trigger a circuit breaker -# before GC kicks in. -11-:-XX:G1ReservePercent=25 -11-:-XX:InitiatingHeapOccupancyPercent=30 - -## JVM temporary directory --Djava.io.tmpdir=${OPENSEARCH_TMPDIR} - -## heap dumps - -# generate a heap dump when an allocation from the Java heap fails -# heap dumps are created in the working directory of the JVM --XX:+HeapDumpOnOutOfMemoryError - -# specify an alternative path for heap dumps; ensure the directory exists and -# has sufficient space --XX:HeapDumpPath=data - -# specify an alternative path for JVM fatal error logs --XX:ErrorFile=logs/hs_err_pid%p.log - -## JDK 8 GC logging -8:-XX:+PrintGCDetails -8:-XX:+PrintGCDateStamps -8:-XX:+PrintTenuringDistribution -8:-XX:+PrintGCApplicationStoppedTime -8:-Xloggc:logs/gc.log -8:-XX:+UseGCLogFileRotation -8:-XX:NumberOfGCLogFiles=32 -8:-XX:GCLogFileSize=64m - -# JDK 9+ GC logging -9-:-Xlog:gc*,gc+age=trace,safepoint:file=logs/gc.log:utctime,pid,tags:filecount=32,filesize=64m - -# JDK 20+ Incubating Vector Module for SIMD optimizations; -# disabling may reduce performance on vector optimized lucene -20-:--add-modules=jdk.incubator.vector - -# See please https://bugs.openjdk.org/browse/JDK-8341127 (openjdk/jdk#21283) -23:-XX:CompileCommand=dontinline,java/lang/invoke/MethodHandle.setAsTypeCache -23:-XX:CompileCommand=dontinline,java/lang/invoke/MethodHandle.asTypeUncached - -21-:-javaagent:agent/opensearch-agent.jar -21-:--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED \ No newline at end of file diff --git a/scripts/terraform/modules/opensearch/opensearch.tf b/scripts/terraform/modules/opensearch/opensearch.tf deleted file mode 100644 index b6833d98..00000000 --- a/scripts/terraform/modules/opensearch/opensearch.tf +++ /dev/null @@ -1,131 +0,0 @@ -locals { - default_cluster_arch = "x64" - cluster_arch = local.default_cluster_arch -} - -terraform { - required_providers { - aws = { - source = "hashicorp/aws" - version = "5.65.0" - } - } -} - -data "aws_caller_identity" "current" {} - -locals { - # start at 4 because first 4 addresses are reserved for AWS - cluster_node_private_ips = [ - cidrhost(var.subnet_cidr_block, 4), - cidrhost(var.subnet_cidr_block, 5), - cidrhost(var.subnet_cidr_block, 6) - ] - main_cluster_node_private_ip = local.cluster_node_private_ips[0] - nodes_type = "single" - additional_nodes_idx = 3 - additional_cluster_node_private_ips = slice(local.cluster_node_private_ips, local.additional_nodes_idx, 3) -} - - -resource "aws_instance" "target-cluster-additional-nodes" { - for_each = toset(local.additional_cluster_node_private_ips) - ami = var.cluster_ami_id - instance_type = var.cluster_instance_type - key_name = var.ssh_key_name - vpc_security_group_ids = var.security_groups - - associate_public_ip_address = true - - subnet_id = var.subnet_id - - private_ip = each.key - - user_data = templatefile("${path.module}/os-cluster.yaml", - { - os_cluster_script = yamlencode(base64gzip(file("${path.module}/os_cluster.sh"))), - os_password = var.password, - os_version = var.os_version, - os_arch = local.cluster_arch, - authorized_ssh_key = var.ssh_pub_key, - jvm_options_2 = yamlencode(base64gzip(file("${path.module}/jvm.v2.options"))), - jvm_options_3 = yamlencode(base64gzip(file("${path.module}/jvm.v3.options"))), - cluster_ips = join(",", local.cluster_node_private_ips), - node_name = format("node-%s", each.key), - nodes_type = local.nodes_type, - } - ) - user_data_replace_on_change = true - - private_dns_name_options { - hostname_type = "resource-name" - } - - provisioner "remote-exec" { - inline = [ - "echo 'Waiting for user data script to finish'", - "cloud-init status --wait > /dev/null", - "echo 'User data script finished'", - ] - } - - connection { - type = "ssh" - user = "ubuntu" - private_key = var.ssh_priv_key - host = self.public_ip - } - - tags = var.tags -} - -resource "aws_instance" "target-cluster-main-node" { - ami = var.cluster_ami_id - instance_type = var.cluster_instance_type - key_name = var.ssh_key_name - vpc_security_group_ids = var.security_groups - - associate_public_ip_address = true - - subnet_id = var.subnet_id - - private_ip = local.main_cluster_node_private_ip - - user_data = templatefile("${path.module}/os-cluster.yaml", - { - os_cluster_script = yamlencode(base64gzip(file("${path.module}/os_cluster.sh"))), - os_password = var.password, - os_version = var.os_version, - os_arch = local.cluster_arch, - authorized_ssh_key = var.ssh_pub_key, - jvm_options_2 = yamlencode(base64gzip(file("${path.module}/jvm.v2.options"))), - jvm_options_3 = yamlencode(base64gzip(file("${path.module}/jvm.v3.options"))), - cluster_ips = join(",", local.cluster_node_private_ips), - node_name = "main-node", - nodes_type = local.nodes_type, - } - ) - user_data_replace_on_change = true - - private_dns_name_options { - hostname_type = "resource-name" - } - - provisioner "remote-exec" { - inline = [ - "echo 'Waiting for user data script to finish'", - "cloud-init status --wait > /dev/null", - "echo 'User data script finished'", - ] - } - - connection { - type = "ssh" - user = "ubuntu" - private_key = var.ssh_priv_key - host = self.public_ip - } - - tags = var.tags - depends_on = [aws_instance.target-cluster-additional-nodes] -} \ No newline at end of file diff --git a/scripts/terraform/modules/opensearch/os-cluster.yaml b/scripts/terraform/modules/opensearch/os-cluster.yaml deleted file mode 100644 index 09977a61..00000000 --- a/scripts/terraform/modules/opensearch/os-cluster.yaml +++ /dev/null @@ -1,55 +0,0 @@ -#cloud-config - -hostname: ${node_name} -fqdn: ${node_name} - -write_files: - - path: /etc/sysctl.d/99-custom.conf - content: | - vm.max_map_count=262144 - owner: root:root - permissions: '0644' - - path: /os_cluster.sh - encoding: gz+b64 - content: ${os_cluster_script} - owner: root:root - permissions: '0755' - - path: /ssh_pub_key - content: ${authorized_ssh_key} - owner: root:root - permissions: '0644' - - path: /jvm.v2.options - encoding: gz+b64 - content: | - ${jvm_options_2} - owner: root:root - permissions: '0644' - - path: /jvm.v3.options - encoding: gz+b64 - content: | - ${jvm_options_3} - owner: root:root - permissions: '0644' - -fs_setup: - - label: None - filesystem: xfs - device: /dev/nvme1n1 - partition: none - overwrite: true - -mounts: - - [ /dev/nvme1n1, /mnt, xfs, "defaults", "0", "0" ] - -bootcmd: - - timeout 30s sh -c 'while [ ! -e /dev/nvme1n1 ]; do sleep 1; done' - - swapoff -a - - ufw allow 9200/tcp - - ufw allow 9300/tcp - -runcmd: - - [ sysctl, -p, /etc/sysctl.d/99-custom.conf ] - - [ chown, -R, ubuntu:ubuntu, /mnt ] - - [ mv, /ssh_pub_key, /home/ubuntu/.ssh/authorized_keys ] - - [ chown, ubuntu:ubuntu, /home/ubuntu/.ssh/authorized_keys ] - - [ sudo, -u, ubuntu, /os_cluster.sh, "${os_password}", "${os_version}", "${os_arch}", "${cluster_ips}", "${node_name}", "${nodes_type}" ] \ No newline at end of file diff --git a/scripts/terraform/modules/opensearch/os_cluster.sh b/scripts/terraform/modules/opensearch/os_cluster.sh deleted file mode 100644 index 93384e1a..00000000 --- a/scripts/terraform/modules/opensearch/os_cluster.sh +++ /dev/null @@ -1,130 +0,0 @@ -#!/bin/bash - -CLUSTER_PASSWORD=$1 -CLUSTER_VERSION=$2 -CLUSTER_ARCH=$3 -#OS_SNAPSHOT_AWS_ACCESS_KEY_ID=$4 -#OS_SNAPSHOT_AWS_SECRET_ACCESS_KEY=$5 -#CLUSTER_IPS=$6 -#NODE_NAME=$7 -#NODES_TYPE=$8 -CLUSTER_IPS=$4 -NODE_NAME=$5 -NODES_TYPE=$6 - -# Check if the cluster version is a nightly one -if [[ $CLUSTER_VERSION == *"-nightly-"* ]]; then - IS_NIGHTLY=true - NIGHTLY_VERSION="${CLUSTER_VERSION#*-nightly-}" - CLUSTER_VERSION="${CLUSTER_VERSION/-nightly*/}" - echo "Downloading nightly version $CLUSTER_VERSION of OpenSearch" -else - echo "Downloading version $CLUSTER_VERSION of OpenSearch" -fi - -INSTALL_ROOT=/mnt/opensearch -INSTALL_PATH=$INSTALL_ROOT/opensearch-$CLUSTER_VERSION -INSTALL_FILENAME=opensearch-$CLUSTER_VERSION-linux-$CLUSTER_ARCH.tar.gz -# If it's a nightly version, download it from the nightly repository -if [[ $IS_NIGHTLY == true ]]; then - DOWNLOAD_URL=https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/$CLUSTER_VERSION/$NIGHTLY_VERSION/linux/$CLUSTER_ARCH/tar/dist/opensearch/$INSTALL_FILENAME - - S3_INSTALL_FILENAME=repository-s3-$CLUSTER_VERSION.zip - S3_PLUGIN_URL=https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/$CLUSTER_VERSION/$NIGHTLY_VERSION/linux/$CLUSTER_ARCH/tar/builds/opensearch/core-plugins/$S3_INSTALL_FILENAME -else - DOWNLOAD_URL=https://artifacts.opensearch.org/releases/bundle/opensearch/$CLUSTER_VERSION/$INSTALL_FILENAME -fi -CONFIG_FILE=$INSTALL_PATH/config/opensearch.yml -JVM_CONFIG=$INSTALL_PATH/config/jvm.options - -cd /mnt || exit 1 - -# Download and install OpenSearch then remove installer -mkdir -p $INSTALL_PATH -wget $DOWNLOAD_URL -tar -xvf $INSTALL_FILENAME -C $INSTALL_ROOT -rm $INSTALL_FILENAME - -# Specify directories for storage and update the configuration to allow incoming connections. -# Also a config that is needed to make the s3 client successfully locate the snapshot bucket -cat <<EOF > $CONFIG_FILE -network.host: 0.0.0.0 -node.name: $NODE_NAME -path.repo: ["/mnt/backup"] -path.data: /mnt/data -path.logs: /mnt/logs -s3.client.default.region: us-east-1 -indices.recovery.max_bytes_per_sec: 2048mb -search.concurrent_segment_search.mode: auto -EOF - -if [[ "$NODES_TYPE" == "multi" ]]; then - # multi-node settings - cat <<EOF >> $CONFIG_FILE -cluster.initial_cluster_manager_nodes: main-node -discovery.seed_hosts: [$CLUSTER_IPS] -EOF -else - # single node settings - echo "discovery.type: single-node" >> $CONFIG_FILE -fi - -# Replace the JVM options file with the correct one for the OS version -CURRENT_OS_VERSION=$(echo "$CLUSTER_VERSION" | cut -d. -f1) -JVM_VERSION_CONFIG="/jvm.v$CURRENT_OS_VERSION.options" -echo "Copying JVM options file $JVM_VERSION_CONFIG to $JVM_CONFIG" -cp $JVM_VERSION_CONFIG $JVM_CONFIG - -sudo mkdir /mnt/backup && sudo chmod ugo+rwx /mnt/backup -sudo mkdir /mnt/data && sudo chmod ugo+rwx /mnt/data -sudo mkdir /mnt/logs && sudo chmod ugo+rwx /mnt/logs - - -# JDK location -export OPENSEARCH_JAVA_HOME=$INSTALL_PATH/jdk -echo "export OPENSEARCH_JAVA_HOME=$OPENSEARCH_JAVA_HOME" >> ~/.bashrc - -# Fix the JVM size -GB=$(echo "$(cat /proc/meminfo | grep MemTotal | awk '{print $2}') / (1024*1024*2)" | bc) -sed -i "s/-Xms1g/-Xms${GB}g/" $JVM_CONFIG -sed -i "s/-Xmx1g/-Xmx${GB}g/" $JVM_CONFIG - -# Install the s3 plugin if necessary (S3_PLUGIN_URL and S3_INSTALL_FILENAME is set) -if [[ -n "$S3_PLUGIN_URL" ]]; then - wget $S3_PLUGIN_URL - sudo $INSTALL_PATH/bin/opensearch-plugin install -b file:///mnt/$S3_INSTALL_FILENAME -else - sudo $INSTALL_PATH/bin/opensearch-plugin install -b -s repository-s3 -fi - -if [[ "$CURRENT_OS_VERSION" == "2" ]]; then - # Manually run security demo config to modify it - OPENSEARCH_INITIAL_ADMIN_PASSWORD=$CLUSTER_PASSWORD bash $INSTALL_PATH/plugins/opensearch-security/tools/install_demo_configuration.sh -y -i -s || exit 1 - # Set allowed TLS protocols to fix: https://github.com/opensearch-project/security/issues/3299 - echo 'plugins.security.ssl.http.enabled_protocols: ["TLSv1.2"]' >> $CONFIG_FILE -fi - -# Run opensearch startup script with security demo configuration -OPENSEARCH_INITIAL_ADMIN_PASSWORD=$CLUSTER_PASSWORD $INSTALL_PATH/opensearch-tar-install.sh &> opensearch.log & -SERVER_PID=$! - -# Record the pid -echo $SERVER_PID > /mnt/pid - -echo "Waiting for server to boot" -# Wait for OpenSearch to start (break after 20 tries) -tries=0 -while ! curl --max-time 5 -ks https://localhost:9200 > /dev/null 2>&1 ; do - echo "Waiting for OpenSearch to start ($tries)" - ((tries++)) - sleep $tries - if [ $tries -eq 20 ]; then - echo "Failed to start OpenSearch" - exit 1 - fi -done - -echo "OpenSearch responds on port 9200, now verify credentials" -curl -X GET https://localhost:9200 -u "admin:$CLUSTER_PASSWORD" --insecure || (echo "Failed to query server" && false) -echo -echo "Server up and running (pid $SERVER_PID)" \ No newline at end of file diff --git a/scripts/terraform/modules/opensearch/outputs.tf b/scripts/terraform/modules/opensearch/outputs.tf deleted file mode 100644 index 6b180300..00000000 --- a/scripts/terraform/modules/opensearch/outputs.tf +++ /dev/null @@ -1,7 +0,0 @@ -output "os-cluster-ip" { - value = aws_instance.target-cluster-main-node.public_dns -} - -output "os-additional-cluster-ips" { - value = [for resource in aws_instance.target-cluster-additional-nodes : resource.public_dns] -} diff --git a/scripts/terraform/modules/opensearch/variables.tf b/scripts/terraform/modules/opensearch/variables.tf deleted file mode 100644 index 105c4a47..00000000 --- a/scripts/terraform/modules/opensearch/variables.tf +++ /dev/null @@ -1,70 +0,0 @@ -variable "cluster_instance_type" { - description = "Instance type for the cluster" - type = string -} - -variable "loadgen_instance_type" { - description = "Instance type for the load generation machine" - type = string -} - -variable "cluster_ami_id" { - description = "AMI ID to use for the cluster" - type = string -} - -variable "loadgen_ami_id" { - description = "AMI ID to use for the load generation machine" - type = string -} - -variable "os_version" { - description = "Version of OpenSearch to deploy" - type = string - default = "3.0.0-beta1-nightly-11019" -} - -variable "distribution_version" { - description = "OSB distribution-version to use" - type = string -} - -variable "ssh_key_name" { - description = "Name of the SSH key to use for the cluster" - type = string -} - -variable "ssh_priv_key" { - description = "SSH Private Key" - type = string -} - -variable "ssh_pub_key" { - description = "SSH Pub Key" - type = string -} - -variable "security_groups" { - description = "List of security groups to apply to the OS instance" - type = list(string) -} - -variable "subnet_id" { - description = "Subnet ID" - type = string -} - -variable "subnet_cidr_block" { - description = "Subnet CIDR Block" - type = string -} - -variable "tags" { - description = "List of Tags to apply to resources" - type = any -} - -variable "password" { - description = "Password for the OS cluster" - type = string -} \ No newline at end of file diff --git a/scripts/terraform/outputs.tf b/scripts/terraform/outputs.tf deleted file mode 100644 index c179301b..00000000 --- a/scripts/terraform/outputs.tf +++ /dev/null @@ -1,17 +0,0 @@ -output "target-cluster-ip" { - value = module.os-cluster[0].os-cluster-ip -} - -output "additional-cluster-ips" { - value = module.os-cluster[0].os-additional-cluster-ips -} - - -output "cluster-password" { - value = random_password.cluster-password.result - sensitive = true -} - -output "ssh_private_key_file" { - value = local_file.private_key.filename -} \ No newline at end of file diff --git a/scripts/terraform/provision-cluster.tf b/scripts/terraform/provision-cluster.tf deleted file mode 100644 index 9c05eaf1..00000000 --- a/scripts/terraform/provision-cluster.tf +++ /dev/null @@ -1,194 +0,0 @@ -terraform { - required_providers { - aws = { - source = "hashicorp/aws" - version = "5.65.0" - } - - random = { - source = "hashicorp/random" - version = "3.6.2" - } - - external = { - source = "hashicorp/external" - version = "2.3.4" - } - } -} - -provider "aws" { - region = var.aws_region - - default_tags { - tags = { - Workspace = terraform.workspace - Service = "OSB" - } - } -} - -# Created to access the ec2 instances -resource "tls_private_key" "ssh_key" { - algorithm = "RSA" - rsa_bits = 4096 -} - -# Save private key to a local file -resource "local_file" "private_key" { - content = tls_private_key.ssh_key.private_key_pem - filename = "${path.module}/private_key-${terraform.workspace}.pem" - file_permission = "0600" -} - -# Referred to for ssh`ing into the ec2 instance -resource "aws_key_pair" "ssh_key" { - key_name = "${terraform.workspace}-ssh-key" - public_key = tls_private_key.ssh_key.public_key_openssh -} - -/* -In these next steps, we are setting up the security -configurations for accessing the ec2 instance where -OpenSearch is running. This is required for accessing -the EC2 instances through ssh. -*/ -resource "aws_vpc" "vpc" { - cidr_block = "10.0.0.0/16" - enable_dns_hostnames = true - enable_dns_support = true -} - -resource "aws_subnet" "subnet" { - cidr_block = cidrsubnet(aws_vpc.vpc.cidr_block, 3, 1) - vpc_id = aws_vpc.vpc.id - availability_zone = var.aws_subnet_zone -} - -resource "aws_internet_gateway" "gtw" { - vpc_id = aws_vpc.vpc.id -} - -resource "aws_security_group" "allow_osb" { - name = "${terraform.workspace}-allow-osb" - description = "Allow ES/OS/OSB inbound traffic and all outbound traffic" - vpc_id = aws_vpc.vpc.id - - lifecycle { - create_before_destroy = true - } -} - -resource "aws_vpc_security_group_ingress_rule" "allow_ssh" { - security_group_id = aws_security_group.allow_osb.id - cidr_ipv4 = "0.0.0.0/0" - from_port = 22 - ip_protocol = "tcp" - to_port = 22 -} - -resource "aws_vpc_security_group_ingress_rule" "allow_es_cluster_traffic_9200" { - security_group_id = aws_security_group.allow_osb.id - cidr_ipv4 = "10.0.0.0/16" - from_port = 9200 - to_port = 9200 - ip_protocol = "tcp" -} - -resource "aws_vpc_security_group_egress_rule" "allow_all_traffic_ipv4" { - security_group_id = aws_security_group.allow_osb.id - cidr_ipv4 = "0.0.0.0/0" - ip_protocol = "-1" # semantically equivalent to all ports -} - -resource "aws_route_table" "route-table-test-env" { - vpc_id = aws_vpc.vpc.id - route { - cidr_block = "0.0.0.0/0" - gateway_id = aws_internet_gateway.gtw.id - } -} - -resource "aws_route_table_association" "subnet-association" { - subnet_id = aws_subnet.subnet.id - route_table_id = aws_route_table.route-table-test-env.id -} - -data "aws_ami" "ubuntu_ami_amd64" { - most_recent = true - - filter { - name = "name" - values = ["ubuntu/images/hvm-ssd/ubuntu-jammy-22.04-amd64-server-*"] - } - - filter { - name = "virtualization-type" - values = ["hvm"] - } - - owners = ["099720109477"] -} - -data "aws_ami" "ubuntu_ami_arm64" { - most_recent = true - - filter { - name = "name" - values = ["ubuntu/images/hvm-ssd/ubuntu-jammy-22.04-arm64-server-*"] - } - - filter { - name = "virtualization-type" - values = ["hvm"] - } - - owners = ["099720109477"] -} - -resource "random_password" "cluster-password" { - length = 16 - special = false - min_lower = 1 - min_upper = 0 - min_numeric = 1 -} - -locals { - default_cluster_instance = "c5d.2xlarge" - default_loadgen_instance = "c5d.2xlarge" - cluster_instance_type = local.default_cluster_instance - loadgen_instance_type = local.default_loadgen_instance - - default_cluster_ami = data.aws_ami.ubuntu_ami_amd64.id - default_loadgen_ami = data.aws_ami.ubuntu_ami_amd64.id - cluster_ami_id = local.default_cluster_ami - loadgen_ami_id = local.default_loadgen_ami -} - -module "os-cluster" { - count = 1 - - source = "./modules/opensearch" - cluster_instance_type = local.cluster_instance_type - loadgen_instance_type = local.loadgen_instance_type - cluster_ami_id = local.cluster_ami_id - loadgen_ami_id = local.loadgen_ami_id - os_version = var.os_version - distribution_version = var.distribution_version - ssh_key_name = aws_key_pair.ssh_key.key_name - ssh_priv_key = tls_private_key.ssh_key.private_key_openssh - ssh_pub_key = tls_private_key.ssh_key.public_key_openssh - security_groups = [aws_security_group.allow_osb.id] - subnet_id = aws_subnet.subnet.id - subnet_cidr_block = aws_subnet.subnet.cidr_block - password = random_password.cluster-password.result - - providers = { - aws = aws - } - - tags = { - Name = "target-cluster" - } -} diff --git a/scripts/terraform/resources.sh b/scripts/terraform/resources.sh deleted file mode 100644 index 1f856aca..00000000 --- a/scripts/terraform/resources.sh +++ /dev/null @@ -1,57 +0,0 @@ -#!/bin/bash - -if [ -z "$AWS_ACCESS_KEY_ID" ]; then - echo "AWS_ACCESS_KEY_ID is not set" - exit 1 -fi -if [ -z "$AWS_SECRET_ACCESS_KEY" ]; then - echo "AWS_SECRET_ACCESS_KEY is not set" - exit 1 -fi - -# Function to get all AWS regions -get_all_regions() { - aws ec2 describe-regions --region us-east-1 --query 'Regions[].RegionName' --output text -} - -get_resource() { - title="$1" - command="$2" - if [ -n "${command}" ]; then - echo "${title} ${command}" - fi -} - -# Print info -output_info() { - region="$1" - resources="$2" - - if [ ${#resources[@]} -gt 0 ]; then - echo -e "Region: ${region}\n" - for r in "${resources[@]}"; do - if [ ! "$r" == "" ]; then - echo -e "${r}\n" - fi - done - printf "+%.0s" {1..80} - echo "" - fi -} - -# Loop through each region and list/delete resources -for region in $(get_all_regions); do - # Set AWS region for the commands - export AWS_DEFAULT_REGION=$region - - ec2_instances=$(get_resource "EC2:" "$(aws ec2 describe-instances --query 'Reservations[].Instances[].InstanceId' --output text)") - s3_buckets=$(get_resource "S3:" "$(aws s3api list-buckets --query 'Buckets[].Name' --output text)") - vpcs=$(get_resource "VPC:" "$(aws ec2 describe-vpcs --query 'Vpcs[].VpcId' --output text)") - secrets=$(get_resource "SecretsManager:" "$(aws secretsmanager list-secrets --query 'SecretList[].Name' --output text)") - kms=$(get_resource "KMS:" "$(aws kms list-keys --query 'Keys[].KeyId' --output text)") - sg=$(get_resource "SecurityGroup:" "$(aws ec2 describe-security-groups --query 'SecurityGroups[].GroupId' --output text)") - ip=$(get_resource "IP:" "$(aws ec2 describe-addresses --query 'Addresses[].PublicIp' --output text)") - - resources=("$ec2_instances" "$s3_buckets" "$vpcs" "$secrets" "$kms" "$sg" "$ip") - output_info "$region" "${resources[@]}" -done \ No newline at end of file diff --git a/scripts/terraform/terraform.tfvars.example b/scripts/terraform/terraform.tfvars.example deleted file mode 100644 index 4fbb5605..00000000 --- a/scripts/terraform/terraform.tfvars.example +++ /dev/null @@ -1,2 +0,0 @@ -aws_region = "eu-west-1" -aws_subnet_zone = "eu-west-1a" \ No newline at end of file diff --git a/scripts/terraform/variables.tf b/scripts/terraform/variables.tf deleted file mode 100644 index 7675a68c..00000000 --- a/scripts/terraform/variables.tf +++ /dev/null @@ -1,21 +0,0 @@ -variable "aws_region" { - description = "AWS region used for the deployment" - type = string -} - -variable "aws_subnet_zone" { - description = "AWS subnet availability zone, tied to the aws_region used" - type = string -} - -variable "os_version" { - description = "Version of OpenSearch to deploy" - type = string - default = "3.0.0-beta1-nightly-11019" -} - -variable "distribution_version" { - description = "OSB distribution-version to use" - type = string - default = "3.0.0-beta1-nightly-11019" -} \ No newline at end of file diff --git a/tests/builder/downloaders/core_plugin_source_downloader_test.py b/tests/builder/downloaders/core_plugin_source_downloader_test.py deleted file mode 100644 index 30bdee87..00000000 --- a/tests/builder/downloaders/core_plugin_source_downloader_test.py +++ /dev/null @@ -1,23 +0,0 @@ -from unittest import TestCase -from unittest.mock import Mock - -from osbenchmark.builder.downloaders.core_plugin_source_downloader import CorePluginSourceDownloader -from osbenchmark.builder.cluster_config import PluginDescriptor - - -class CorePluginSourceDownloaderTest(TestCase): - def setUp(self): - self.host = None - - self.executor = Mock() - self.source_repository_provider = Mock() - self.plugin = PluginDescriptor(name="my-plugin") - self.builder = Mock() - self.opensearch_source_dir = "/fake/path" - - self.source_downloader = CorePluginSourceDownloader(self.plugin, self.executor, self.source_repository_provider, - self.builder, self.opensearch_source_dir) - - def test_download(self): - plugin_binary = self.source_downloader.download(self.host) - self.assertEqual(plugin_binary, {"my-plugin": "file:///fake/path/plugins/my-plugin/build/distributions/*.zip"}) diff --git a/tests/builder/downloaders/external_plugin_source_downloader_test.py b/tests/builder/downloaders/external_plugin_source_downloader_test.py deleted file mode 100644 index 6fafab22..00000000 --- a/tests/builder/downloaders/external_plugin_source_downloader_test.py +++ /dev/null @@ -1,54 +0,0 @@ -from unittest import TestCase, mock -from unittest.mock import Mock - -from osbenchmark.builder.downloaders.external_plugin_source_downloader import ExternalPluginSourceDownloader -from osbenchmark.builder.cluster_config import PluginDescriptor - - -class ExternalPluginSourceDownloaderTest(TestCase): - def setUp(self): - self.executor = Mock() - self.source_repo_provider = Mock() - self.binary_builder = Mock() - - self.host = None - self.plugin_config_instance = PluginDescriptor(name="my-plugin", variables={ - "source": { - "remote": { - "repo": { - "url": "https//fake.url.com" - } - }, - "revision": "current", - "build": { - "command": "gradle build", - "artifact": { - "subdir": "plugin/subdir" - } - }, - } - }) - self.plugin_src_dir = "/fake/dir/for/plugin" - - self.external_plugin_source_downloader = ExternalPluginSourceDownloader(self.plugin_config_instance, self.executor, - self.source_repo_provider, self.binary_builder, - self.plugin_src_dir) - - def test_download_with_build(self): - plugin_binary = self.external_plugin_source_downloader.download(self.host) - self.assertEqual(plugin_binary, {"my-plugin": "file:///fake/dir/for/plugin/plugin/subdir/*.zip"}) - self.source_repo_provider.fetch_repository.assert_has_calls([ - mock.call(self.host, "https//fake.url.com", "current", self.plugin_src_dir) - ]) - self.binary_builder.build.assert_has_calls([ - mock.call(self.host, ["gradle build"], override_source_directory=self.plugin_src_dir) - ]) - - def test_download_without_build(self): - self.binary_builder = None - - plugin_binary = self.external_plugin_source_downloader.download(self.host) - self.assertEqual(plugin_binary, {"my-plugin": "file:///fake/dir/for/plugin/plugin/subdir/*.zip"}) - self.source_repo_provider.fetch_repository.assert_has_calls([ - mock.call(self.host, "https//fake.url.com", "current", self.plugin_src_dir) - ]) diff --git a/tests/builder/downloaders/plugin_distribution_downloader_test.py b/tests/builder/downloaders/plugin_distribution_downloader_test.py deleted file mode 100644 index b4cb2dfb..00000000 --- a/tests/builder/downloaders/plugin_distribution_downloader_test.py +++ /dev/null @@ -1,28 +0,0 @@ -from unittest import TestCase -from unittest.mock import Mock - -from osbenchmark.builder.downloaders.plugin_distribution_downloader import PluginDistributionDownloader -from osbenchmark.builder.cluster_config import PluginDescriptor - - -class PluginDistributionDownloaderTest(TestCase): - def setUp(self): - self.host = None - - self.executor = Mock() - self.plugin = PluginDescriptor(name="my plugin") - - self.distribution_repository_provider = Mock() - self.plugin_distro_downloader = PluginDistributionDownloader(self.plugin, self.executor, self.distribution_repository_provider) - - def test_plugin_url_exists(self): - self.plugin_distro_downloader.distribution_repository_provider.get_download_url.return_value = "https://fake" - - binaries_map = self.plugin_distro_downloader.download(self.host) - self.assertEqual(binaries_map, {"my plugin": "https://fake"}) - - def test_plugin_url_does_not_exist(self): - self.plugin_distro_downloader.distribution_repository_provider.get_download_url.return_value = None - - binaries_map = self.plugin_distro_downloader.download(self.host) - self.assertEqual(binaries_map, {}) diff --git a/tests/builder/installers/preparers/plugin_preparer_test.py b/tests/builder/installers/preparers/plugin_preparer_test.py deleted file mode 100644 index 2830dbd6..00000000 --- a/tests/builder/installers/preparers/plugin_preparer_test.py +++ /dev/null @@ -1,41 +0,0 @@ -from unittest import TestCase, mock -from unittest.mock import Mock - -from osbenchmark.builder.installers.preparers.plugin_preparer import PluginPreparer -from osbenchmark.builder.models.host import Host -from osbenchmark.builder.models.node import Node -from osbenchmark.builder.cluster_config import PluginDescriptor - - -class PluginPreparerTest(TestCase): - def setUp(self): - self.node = Node(binary_path="/fake_binary_path", data_paths=["/fake1", "/fake2"], name=None, - pid=None, telemetry=None, port=None, root_dir=None, log_path=None, heap_dump_path=None) - self.host = Host(name="fake", address="10.17.22.23", metadata={}, node=self.node) - self.binaries = {"unit-test-plugin": "/data/builds/distributions"} - self.all_node_ips = [] - - self.executor = Mock() - self.hook_handler_class = Mock() - self.plugin = PluginDescriptor(name="unit-test-plugin", config_paths=["default"], variables={"active": True}) - - self.plugin_preparer = PluginPreparer(self.plugin, self.executor, self.hook_handler_class) - - def test_plugin_install_with_binary_path(self): - self.plugin_preparer.prepare(self.host, self.binaries) - - self.executor.execute.assert_has_calls([ - mock.call(self.host, "/fake_binary_path/bin/opensearch-plugin install --batch \"/data/builds/distributions\"") - ]) - - def test_plugin_install_without_binary_path(self): - self.plugin_preparer.prepare(self.host, {}) - - self.executor.execute.assert_has_calls([ - mock.call(self.host, "/fake_binary_path/bin/opensearch-plugin install --batch \"unit-test-plugin\"") - ]) - - def test_config_vars(self): - config_vars = self.plugin_preparer.get_config_vars(self.host, self.node, self.all_node_ips) - - self.assertEqual(config_vars, {"active": True}) diff --git a/tests/data_streaming/__init__.py b/tests/data_streaming/__init__.py deleted file mode 100644 index f8fe1c55..00000000 --- a/tests/data_streaming/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# The OpenSearch Contributors require contributions made to -# this file be licensed under the Apache-2.0 license or a -# compatible open source license. -# Modifications Copyright OpenSearch Contributors. See -# GitHub history for details. diff --git a/tests/data_streaming/producer_test.py b/tests/data_streaming/producer_test.py deleted file mode 100644 index 8673b567..00000000 --- a/tests/data_streaming/producer_test.py +++ /dev/null @@ -1,172 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# The OpenSearch Contributors require contributions made to -# this file be licensed under the Apache-2.0 license or a -# compatible open source license. -# Modifications Copyright OpenSearch Contributors. See -# GitHub history for details. - -# pylint: disable=protected-access - -from unittest import TestCase -import unittest.mock as mock - -from osbenchmark.cloud_provider.vendors.s3_data_producer import S3DataProducer - -# pylint: disable=too-many-public-methods -class TestS3DataProducer(TestCase): - - # pylint: disable = arguments-differ - @mock.patch('osbenchmark.cloud_provider.vendors.s3_data_producer.S3DataProducer.__init__', return_value=None) - def setUp(self, mock_init): - self.producer = S3DataProducer('bucket', 'key', None) - - def test_gen_range_args_aligned(self): - self.assertEqual(self.producer._gen_range_args(0, 8, 4), ['bytes=0-3', 'bytes=4-7']) - - def test_gen_range_args_unaligned(self): - self.assertEqual(self.producer._gen_range_args(0, 10, 4), ['bytes=0-3', 'bytes=4-7', 'bytes=8-9']) - - def test_gen_range_args_empty(self): - self.assertEqual(self.producer._gen_range_args(0, 0, 4), []) - - @mock.patch("osbenchmark.cloud_provider.vendors.s3_data_producer.S3DataProducer._output_chunk") - @mock.patch("osbenchmark.cloud_provider.vendors.s3_data_producer.S3DataProducer._get_next_downloader") - def test_generate_chunked_data_aligned(self, downloader, outputter): - downloader.return_value = [ [ b"this is line 1\n" ] ] - self.producer.generate_chunked_data() - outputter.assert_has_calls([ mock.call("this is line 1\n", 0) ]) - - @mock.patch("osbenchmark.cloud_provider.vendors.s3_data_producer.S3DataProducer._output_chunk") - @mock.patch("osbenchmark.cloud_provider.vendors.s3_data_producer.S3DataProducer._get_next_downloader") - def test_generate_chunked_data_unaligned(self, downloader, outputter): - downloader.return_value = [ [ b"this is line 1\nthis is line 2" ] ] - self.producer.generate_chunked_data() - outputter.assert_has_calls([ mock.call("this is line 1\n", 0) ]) - - @mock.patch("osbenchmark.cloud_provider.vendors.s3_data_producer.S3DataProducer._output_chunk") - @mock.patch("osbenchmark.cloud_provider.vendors.s3_data_producer.S3DataProducer._get_next_downloader") - def test_generate_chunked_data_aligned_multiline(self, downloader, outputter): - downloader.return_value = [ [ b"this is line 0\nthis is line 1\n" ] ] - self.producer.generate_chunked_data() - outputter.assert_has_calls([ mock.call("this is line 0\nthis is line 1\n", 0) ]) - - @mock.patch("osbenchmark.cloud_provider.vendors.s3_data_producer.S3DataProducer._output_chunk") - @mock.patch("osbenchmark.cloud_provider.vendors.s3_data_producer.S3DataProducer._get_next_downloader") - def test_generate_chunked_data_unaligned_multiline(self, downloader, outputter): - downloader.return_value = [ [ b"this is line 0\nthis is line 1\nthis is line 2" ] ] - self.producer.generate_chunked_data() - outputter.assert_has_calls([ mock.call("this is line 0\nthis is line 1\n", 0) ]) - - def int_generator(self, n): - for i in range(n): - yield i - - def get_object_subrange(self, args): - return next(self.get_obj_subrange_generator) - - def test_multipart_downloader_4_2(self): - self.producer.chunk_size = 4 - self.producer.num_workers = 2 - self.get_obj_subrange_generator = self.int_generator(40) - - with mock.patch("osbenchmark.cloud_provider.vendors.s3_data_producer.S3DataProducer._s3_get_object_subrange", wraps=self.get_object_subrange): - generator = self.producer._s3_multipart_downloader('bucket', 'key', 0, 12) - self.assertEqual(list(generator), [0, 1, 2]) - - def test_multipart_downloader_4_1(self): - self.producer.chunk_size = 4 - self.producer.num_workers = 1 - self.get_obj_subrange_generator = self.int_generator(40) - - with mock.patch("osbenchmark.cloud_provider.vendors.s3_data_producer.S3DataProducer._s3_get_object_subrange", wraps=self.get_object_subrange): - generator = self.producer._s3_multipart_downloader('bucket', 'key', 0, 12) - self.assertEqual(list(generator), [0, 1, 2]) - - def test_multipart_downloader_4_4(self): - self.producer.chunk_size = 4 - self.producer.num_workers = 4 - self.get_obj_subrange_generator = self.int_generator(40) - - with mock.patch("osbenchmark.cloud_provider.vendors.s3_data_producer.S3DataProducer._s3_get_object_subrange", wraps=self.get_object_subrange): - generator = self.producer._s3_multipart_downloader('bucket', 'key', 0, 12) - self.assertEqual(list(generator), [0, 1, 2]) - - def test_multipart_downloader_4_8(self): - self.producer.chunk_size = 4 - self.producer.num_workers = 8 - self.get_obj_subrange_generator = self.int_generator(40) - - with mock.patch("osbenchmark.cloud_provider.vendors.s3_data_producer.S3DataProducer._s3_get_object_subrange", wraps=self.get_object_subrange): - generator = self.producer._s3_multipart_downloader('bucket', 'key', 0, 12) - self.assertEqual(list(generator), [0, 1, 2]) - - def test_multipart_downloader_8_2(self): - self.producer.chunk_size = 8 - self.producer.num_workers = 2 - self.get_obj_subrange_generator = self.int_generator(40) - - with mock.patch("osbenchmark.cloud_provider.vendors.s3_data_producer.S3DataProducer._s3_get_object_subrange", wraps=self.get_object_subrange): - generator = self.producer._s3_multipart_downloader('bucket', 'key', 0, 12) - self.assertEqual(list(generator), [0, 1]) - - def test_multipart_downloader_8_4(self): - self.producer.chunk_size = 8 - self.producer.num_workers = 4 - self.get_obj_subrange_generator = self.int_generator(40) - - with mock.patch("osbenchmark.cloud_provider.vendors.s3_data_producer.S3DataProducer._s3_get_object_subrange", wraps=self.get_object_subrange): - generator = self.producer._s3_multipart_downloader('bucket', 'key', 0, 12) - self.assertEqual(list(generator), [0, 1]) - - def test_multipart_downloader_8_1(self): - self.producer.chunk_size = 8 - self.producer.num_workers = 4 - self.get_obj_subrange_generator = self.int_generator(40) - - with mock.patch("osbenchmark.cloud_provider.vendors.s3_data_producer.S3DataProducer._s3_get_object_subrange", wraps=self.get_object_subrange): - generator = self.producer._s3_multipart_downloader('bucket', 'key', 0, 12) - self.assertEqual(list(generator), [0, 1]) - - def test_multipart_downloader_8_5(self): - self.producer.chunk_size = 8 - self.producer.num_workers = 5 - self.get_obj_subrange_generator = self.int_generator(40) - - with mock.patch("osbenchmark.cloud_provider.vendors.s3_data_producer.S3DataProducer._s3_get_object_subrange", wraps=self.get_object_subrange): - generator = self.producer._s3_multipart_downloader('bucket', 'key', 0, 12) - self.assertEqual(list(generator), [0, 1]) - - def test_multipart_downloader_8_8(self): - self.producer.chunk_size = 8 - self.producer.num_workers = 8 - self.get_obj_subrange_generator = self.int_generator(40) - - with mock.patch("osbenchmark.cloud_provider.vendors.s3_data_producer.S3DataProducer._s3_get_object_subrange", wraps=self.get_object_subrange): - generator = self.producer._s3_multipart_downloader('bucket', 'key', 0, 12) - self.assertEqual(list(generator), [0, 1]) - - def test_get_keys_single(self): - self.producer.keys = "file.json" - self.assertEqual(list(self.producer._get_next_key()), ["file.json"]) - - def test_get_keys_glob_0(self): - self.producer.bucket = None - self.producer.keys = "file*" - self.producer.s3_client = mock.Mock() - self.producer.s3_client.list_objects.return_value = { "Contents": [] } - self.assertEqual(list(self.producer._get_next_key()), []) - - def test_get_keys_glob_1(self): - self.producer.bucket = None - self.producer.keys = "file*" - self.producer.s3_client = mock.Mock() - self.producer.s3_client.list_objects.return_value = { "Contents": [ { "Key": "file1" } ] } - self.assertEqual(list(self.producer._get_next_key()), ["file1"]) - - def test_get_keys_glob_2(self): - self.producer.bucket = None - self.producer.keys = "file*" - self.producer.s3_client = mock.Mock() - self.producer.s3_client.list_objects.return_value = { "Contents": [ { "Key": "file1" }, { "Key": "file2" } ] } - self.assertEqual(list(self.producer._get_next_key()), ["file1", "file2"]) diff --git a/tests/kafka_client_test.py b/tests/kafka_client_test.py deleted file mode 100644 index 366fecb1..00000000 --- a/tests/kafka_client_test.py +++ /dev/null @@ -1,133 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# The OpenSearch Contributors require contributions made to -# this file be licensed under the Apache-2.0 license or a -# compatible open source license. -# Modifications Copyright OpenSearch Contributors. See -# GitHub history for details. -# Licensed to Elasticsearch B.V. under one or more contributor -# license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright -# ownership. Elasticsearch B.V. licenses this file to you under -# the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from unittest import TestCase, mock - -from osbenchmark.kafka_client import KafkaMessageProducer -from osbenchmark.client import MessageProducerFactory -from tests import run_async - - -class KafkaMessageProducerTests(TestCase): - @run_async - @mock.patch("osbenchmark.kafka_client.AIOKafkaProducer") - async def test_create_producer_with_valid_params(self, mock_aio_kafka_producer_class): - mock_producer_instance = mock.AsyncMock() - mock_aio_kafka_producer_class.return_value = mock_producer_instance - mock_producer_instance.start.return_value = None - - params = { - "ingestion-source": { - "type": "kafka", - "param": { - "topic": "test-topic", - "bootstrap-servers": "localhost:9092" - } - } - } - - producer = await KafkaMessageProducer.create(params) - - mock_aio_kafka_producer_class.assert_called_once_with( - bootstrap_servers="localhost:9092", - key_serializer=str.encode, - value_serializer=str.encode - ) - mock_producer_instance.start.assert_awaited_once() - self.assertIsInstance(producer, KafkaMessageProducer) - # pylint: disable=protected-access - self.assertEqual("test-topic", producer._topic) - - @run_async - async def test_create_producer_missing_topic_raises_error(self): - params = { - "ingestion-source": { - "type": "kafka", - "param": { - # no "topic" entry - "bootstrap-servers": "localhost:9092" - } - } - } - - with self.assertRaisesRegex(ValueError, "No 'topic' specified"): - await KafkaMessageProducer.create(params) - - @run_async - async def test_send_message(self): - mock_producer = mock.AsyncMock() - topic = "test-topic" - producer = KafkaMessageProducer(mock_producer, topic) - - await producer.send_message("test", key="key") - mock_producer.send_and_wait.assert_awaited_once_with( - topic, "test", key="key" - ) - - @run_async - async def test_stop(self): - mock_producer = mock.AsyncMock() - topic = "test-topic" - producer = KafkaMessageProducer(mock_producer, topic) - await producer.stop() - mock_producer.stop.assert_awaited_once() - - -class MessageProducerFactoryTests(TestCase): - @run_async - @mock.patch("osbenchmark.kafka_client.AIOKafkaProducer") - async def test_create_kafka_producer_via_factory(self, mock_aio_kafka_producer_class): - mock_producer_instance = mock.AsyncMock() - mock_aio_kafka_producer_class.return_value = mock_producer_instance - mock_producer_instance.start.return_value = None - - params = { - "ingestion-source": { - "type": "kafka", - "param": { - "topic": "factory-topic", - "bootstrap-servers": "localhost:9092" - } - } - } - - producer = await MessageProducerFactory.create(params) - # The returned instance should be a KafkaMessageProducer - self.assertIsInstance(producer, KafkaMessageProducer) - # pylint: disable=protected-access - self.assertEqual("factory-topic", producer._topic) - mock_aio_kafka_producer_class.assert_called_once() - - @run_async - async def test_create_unsupported_type_raises_error(self): - params = { - "ingestion-source": { - "type": "unknown", - "param": { - "topic": "test" - } - } - } - with self.assertRaisesRegex(ValueError, "Unsupported ingestion source type: unknown"): - await MessageProducerFactory.create(params) diff --git a/tests/test_async_connection.py b/tests/test_async_connection.py deleted file mode 100644 index 6c3678ef..00000000 --- a/tests/test_async_connection.py +++ /dev/null @@ -1,62 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# The OpenSearch Contributors require contributions made to -# this file be licensed under the Apache-2.0 license or a -# compatible open source license. -# Modifications Copyright OpenSearch Contributors. See -# GitHub history for details. -# Licensed to Elasticsearch B.V. under one or more contributor -# license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright -# ownership. Elasticsearch B.V. licenses this file to you under -# the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import json -from unittest import TestCase - -from osbenchmark.async_connection import ResponseMatcher - - -class ResponseMatcherTests(TestCase): - def test_matches(self): - matcher = ResponseMatcher(responses=[ - { - "path": "*/_bulk", - "body": { - "response-type": "bulk", - } - }, - { - "path": "/_cluster/health*", - "body": { - "response-type": "cluster-health", - } - }, - { - "path": "*", - "body": { - "response-type": "default" - } - } - ]) - - self.assert_response_type(matcher, "/_cluster/health", "cluster-health") - self.assert_response_type(matcher, "/_cluster/health/geonames", "cluster-health") - self.assert_response_type(matcher, "/geonames/_bulk", "bulk") - self.assert_response_type(matcher, "/geonames", "default") - self.assert_response_type(matcher, "/geonames/force_merge", "default") - - def assert_response_type(self, matcher, path, expected_response_type): - response = json.loads(matcher.response(path)) - self.assertEqual(response["response-type"], expected_response_type) diff --git a/tests/worker_coordinator/proto_bulk_helper_test.py b/tests/worker_coordinator/proto_bulk_helper_test.py deleted file mode 100644 index 5d9f4e3e..00000000 --- a/tests/worker_coordinator/proto_bulk_helper_test.py +++ /dev/null @@ -1,124 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# The OpenSearch Contributors require contributions made to -# this file be licensed under the Apache-2.0 license or a -# compatible open source license. -# Modifications Copyright OpenSearch Contributors. See -# GitHub history for details. -# Licensed to Elasticsearch B.V. under one or more contributor -# license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright -# ownership. Elasticsearch B.V. licenses this file to you under -# the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from unittest import TestCase - -from opensearch.protobufs.schemas import document_pb2 -from osbenchmark.worker_coordinator.proto_helpers.ProtoBulkHelper import ProtoBulkHelper - -class ProtoBulkHelperTests(TestCase): - def test_build_proto_request_single_document(self): - params = { - "index": "test-index", - "body": b'{"index": {"_index": "test-index"}}\n{"field1": "value1", "field2": "value2"}\n' - } - - result = ProtoBulkHelper.build_proto_request(params) - - self.assertIsInstance(result, document_pb2.BulkRequest) - self.assertEqual(result.index, "test-index") - self.assertEqual(len(result.request_body), 1) - self.assertEqual(result.request_body[0].object, b'{"field1": "value1", "field2": "value2"}') - self.assertTrue(result.request_body[0].operation_container.HasField("index")) - - def test_build_proto_request_multiple_documents(self): - params = { - "index": "test-index", - "body": (b'{"index": {"_index": "test-index"}}\n' - b'{"field1": "value1"}\n' - b'{"index": {"_index": "test-index"}}\n' - b'{"field1": "value2"}\n') - } - - result = ProtoBulkHelper.build_proto_request(params) - - self.assertIsInstance(result, document_pb2.BulkRequest) - self.assertEqual(result.index, "test-index") - self.assertEqual(len(result.request_body), 2) - self.assertEqual(result.request_body[0].object, b'{"field1": "value1"}') - self.assertEqual(result.request_body[1].object, b'{"field1": "value2"}') - - def test_build_stats_success_response(self): - mock_bulk_response = document_pb2.BulkResponse() - mock_bulk_response.took = 100 - - for _ in range(3): - item = document_pb2.Item() - item.index.status = 201 - mock_bulk_response.items.append(item) - - params = { - "index": "test-index", - "bulk-size": 3, - "unit": "ops" - } - - result = ProtoBulkHelper.build_stats(mock_bulk_response, params) - - expected = { - "index": "test-index", - "weight": 3, - "unit": "ops", - "took": 100, - "success": True, - "success-count": 3, - "error-count": 0, - } - - self.assertEqual(result, expected) - - def test_build_stats_bulk_error_response_status(self): - mock_bulk_response = document_pb2.BulkResponse() - mock_bulk_response.errors = True - - params = { - "index": "test-index", - "bulk-size": 15, - "unit": "ops" - } - - result = ProtoBulkHelper.build_stats(mock_bulk_response, params) - - expected = { - "index": "test-index", - "weight": 15, - "unit": "ops", - "took": None, - "success": False, - "success-count": 0, - "error-count": 15, - "error-type": "bulk" - } - - self.assertEqual(result, expected) - - def test_build_stats_detailed_results_raises_exception(self): - mock_bulk_response = document_pb2.BulkResponse() - - params = {"detailed-results": True} - - with self.assertRaises(Exception) as ctx: - ProtoBulkHelper.build_stats(mock_bulk_response, params) - - self.assertIn("Detailed results not supported for gRPC bulk requests", str(ctx.exception)) diff --git a/tests/worker_coordinator/proto_query_helper_test.py b/tests/worker_coordinator/proto_query_helper_test.py deleted file mode 100644 index 1ed5f6aa..00000000 --- a/tests/worker_coordinator/proto_query_helper_test.py +++ /dev/null @@ -1,356 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# The OpenSearch Contributors require contributions made to -# this file be licensed under the Apache-2.0 license or a -# compatible open source license. -# Modifications Copyright OpenSearch Contributors. See -# GitHub history for details. -# Licensed to Elasticsearch B.V. under one or more contributor -# license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright -# ownership. Elasticsearch B.V. licenses this file to you under -# the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from unittest import TestCase - -import numpy as np -from opensearch.protobufs.schemas import search_pb2 - -from osbenchmark.worker_coordinator.proto_helpers.ProtoQueryHelper import ProtoQueryHelper - -class ProtoQueryHelperTests(TestCase): - def test_build_proto_request_match_all_query(self): - params = { - "index": "test-index", - "body": { - "query": { - "match_all": {} - }, - "size": 10, - "_source": True - }, - "request-timeout": 5000, - "cache": "true" - } - - request = ProtoQueryHelper.build_proto_request(params) - - self.assertIsInstance(request, search_pb2.SearchRequest) - self.assertEqual(request.index, ["test-index"]) - self.assertEqual(request.request_body.size, 10) - self.assertEqual(request.request_body.timeout, "5000ms") - self.assertTrue(request.request_cache) - self.assertTrue(request.x_source.bool) - self.assertTrue(request.request_body.query.HasField("match_all")) - - def test_build_proto_request_term_query(self): - params = { - "index": "test-index", - "body": { - "query": { - "term": { - "log.file.path": { - "value" : "/var/log/messages/birdknight" - } - } - } - } - } - - result = ProtoQueryHelper.build_proto_request(params) - - self.assertIsInstance(result, search_pb2.SearchRequest) - self.assertTrue(result.request_body.query.HasField("term")) - self.assertEqual(result.request_body.query.term.field, "log.file.path") - self.assertEqual(result.request_body.query.term.value.string, "/var/log/messages/birdknight") - - def test_build_proto_request_term_query_multi_field_fails(self): - params = { - "index": "test-index", - "body": { - "query": { - "term": { - "log.file.path": { - "value" : ["/var/log/messages/birdknight", "/var/log/messages/otherterm"] - } - } - } - } - } - - with self.assertRaises(Exception): - ProtoQueryHelper.build_proto_request(params) - -class ProtoKNNQueryHelperTests(TestCase): - def test_build_vector_search_proto_request_basic(self): - params = { - 'body': { - 'query': { - 'knn': { - 'target_field': { - 'vector': np.array([1.49081022e-01], dtype=np.float32), - 'k': 100 - } - } - }, - 'size': 100, - 'docvalue_fields': ['_id'], - 'stored_fields': '_none_', - }, - 'request-params': { - '_source': False, - 'allow_partial_search_results': 'false' - }, - 'k': 100, - 'index': 'target_index', - 'cache': None, - 'request-timeout': None, - 'profile-query': False - - } - - request = ProtoQueryHelper.build_vector_search_proto_request(params) - - self.assertIsInstance(request, search_pb2.SearchRequest) - self.assertTrue(request.request_body.query.HasField('knn')) - self.assertEqual(request.request_body.query.knn.field, 'target_field') - self.assertAlmostEqual(request.request_body.query.knn.vector[0], 1.49081022e-01, places=5) - self.assertEqual(request.request_body.query.knn.k, 100) - - self.assertEqual(request.request_body.size, 100) - self.assertEqual(request.docvalue_fields, ['_id']) - - self.assertEqual(request.stored_fields, ["_none_"]) - - self.assertFalse(request.x_source.bool) - self.assertFalse(request.allow_partial_search_results) - - self.assertEqual(request.index, ['target_index']) - self.assertEqual(request.request_cache, False) - self.assertEqual(request.timeout, '') - self.assertEqual(request.request_body.profile, False) - - def test_build_vector_search_proto_defaults_with_optional_fields_empty(self): - params = { - 'body': { - 'query': { - 'knn': { - 'knn_field': { - 'vector': np.array([1.49081022e-01], dtype=np.float32), - 'k': 100 - } - } - }, - }, - 'request-params': {}, - 'index': 'index_required', - 'k': 100, - } - - request = ProtoQueryHelper.build_vector_search_proto_request(params) - - self.assertIsInstance(request, search_pb2.SearchRequest) - self.assertTrue(request.request_body.query.HasField('knn')) - self.assertEqual(request.request_body.query.knn.field, 'knn_field') - self.assertAlmostEqual(request.request_body.query.knn.vector[0], 1.49081022e-01, places=5) - self.assertEqual(request.request_body.query.knn.k, 100) - self.assertEqual(request.index, ['index_required']) - - # expected defaults when these fields are not set - self.assertEqual(request.request_body.size, 0) - self.assertEqual(request.docvalue_fields, []) - self.assertEqual(request.stored_fields, ["_none_"]) - self.assertFalse(request.x_source.bool) - self.assertFalse(request.allow_partial_search_results) - self.assertEqual(request.request_cache, False) - self.assertEqual(request.timeout, '') - self.assertEqual(request.request_body.profile, False) - - def test_build_vector_search_proto_stored_fields_non_list_parsing(self): - params = { - 'body': { - 'query': { - 'knn': { - 'knn_field': { - 'vector': np.array([1.0], dtype=np.float32), - 'k': 100 - } - } - }, - 'docvalue_fields': ['_id'], - 'stored_fields': "_none_", - }, - 'request-params': {}, - 'index': 'index_required', - 'k': 100, - } - - request = ProtoQueryHelper.build_vector_search_proto_request(params) - - self.assertEqual(request.stored_fields, ["_none_"]) - - params['body']['stored_fields'] = ['field1', 'field2'] - request = ProtoQueryHelper.build_vector_search_proto_request(params) - - self.assertEqual(request.stored_fields, ['field1', 'field2']) - - params['body']['stored_fields'] = 'field1' - - with self.assertRaises(Exception) as context: - ProtoQueryHelper.build_vector_search_proto_request(params) - - self.assertIn('Stored fields must be a list', str(context.exception)) - - def test_build_vector_search_proto_string_or_bool(self): - params = { - 'body': { - 'query': { - 'knn': { - 'target_field': { - 'vector': np.array([1.49081022e-01], dtype=np.float32), - 'k': 100 - } - } - }, - }, - 'request-params': { - '_source': "true", - 'allow_partial_search_results': False, - }, - 'cache': None, - 'k': 100, - 'index': 'target_index', - 'profile-query': "fAlSe" - } - - request = ProtoQueryHelper.build_vector_search_proto_request(params) - - self.assertIsInstance(request, search_pb2.SearchRequest) - self.assertTrue(request.request_body.query.HasField('knn')) - self.assertEqual(request.request_body.query.knn.field, 'target_field') - self.assertAlmostEqual(request.request_body.query.knn.vector[0], 1.49081022e-01, places=5) - self.assertEqual(request.request_body.query.knn.k, 100) - self.assertEqual(request.index, ['target_index']) - - # bools are provided in several forms: True/False/None/"True"/"False"/"true"/"false" - self.assertTrue(request.x_source.bool) - self.assertFalse(request.allow_partial_search_results) - self.assertEqual(request.request_cache, False) - self.assertEqual(request.request_body.profile, False) - - def test_build_vector_search_proto_ignore_params(self): - params = { - 'body': { - 'query': { - 'knn': { - 'target_field': { - 'vector': np.array([1.49081022e-01], dtype=np.float32), - 'k': 100 - } - } - }, - }, - 'request-params': {}, - 'index': 'target_index', - } - - basic_request = ProtoQueryHelper.build_vector_search_proto_request(params) - - # http params to ignore - params["opaque-id"] = '1234' - params["headers"] = { - 'sample_header': 'sample_header' - } - - # neighbors not supported but always provided - params["neighbors"] = np.array([1.2, 2.3, 5.5], dtype=np.float32) - - ignore_request = ProtoQueryHelper.build_vector_search_proto_request(params) - - self.assertEqual(ignore_request, basic_request) - - def test_build_vector_search_proto_request_detailed_results_raises_error(self): - params = { - 'index': 'test_index', - 'body': { - 'query': { - 'knn': { - 'vector_field': { - 'vector': np.array([1.0], dtype=np.float32), - 'k': 5 - } - } - } - }, - 'k': 5, - 'request-params': { - '_source': False - }, - 'detailed-results': True - } - - with self.assertRaises(NotImplementedError) as context: - ProtoQueryHelper.build_vector_search_proto_request(params) - - self.assertIn('Detailed results not supported', str(context.exception)) - - def test_build_vector_search_proto_request_calculate_recall_raises_error(self): - params = { - 'index': 'test_index', - 'body': { - 'query': { - 'knn': { - 'vector_field': { - 'vector': np.array([1.0], dtype=np.float32), - 'k': 5 - } - } - } - }, - 'k': 5, - 'request-params': { - '_source': False - }, - 'calculate-recall': True - } - - with self.assertRaises(NotImplementedError) as context: - ProtoQueryHelper.build_vector_search_proto_request(params) - - self.assertIn('Recall calculations not supported', str(context.exception)) - - def test_build_vector_search_proto_request_compression_raises_error(self): - params = { - 'index': 'test_index', - 'body': { - 'query': { - 'knn': { - 'vector_field': { - 'vector': np.array([1.0], dtype=np.float32), - 'k': 5 - } - } - } - }, - 'k': 5, - 'request-params': { - '_source': False - }, - 'response-compression-enabled': True - } - - with self.assertRaises(NotImplementedError) as context: - ProtoQueryHelper.build_vector_search_proto_request(params) - - self.assertIn('Compression not supported', str(context.exception))
