This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 67d2c29d382 [fix][admin] Fix `validatePersistencePolicies` that
Namespace/Topic persistent policies cannot set to < 0 (#18999)
67d2c29d382 is described below
commit 67d2c29d3821ad2cdf6b26d3132861d32757412a
Author: Tao Jiuming <[email protected]>
AuthorDate: Sat Jan 28 11:12:24 2023 +0800
[fix][admin] Fix `validatePersistencePolicies` that Namespace/Topic
persistent policies cannot set to < 0 (#18999)
(cherry picked from commit 4cae20ce9e0bd6be9843893abff2866a03a4556f)
---
.../apache/pulsar/broker/admin/AdminResource.java | 15 +-
.../apache/pulsar/broker/admin/NamespacesTest.java | 12 ++
.../pulsar/broker/admin/TopicPoliciesTest.java | 16 ++
.../broker/auth/MockedPulsarServiceBaseTest.java | 13 ++
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 23 ++-
.../apache/pulsar/admin/cli/CmdTopicPolicies.java | 23 ++-
.../org/apache/pulsar/admin/cli/CmdTopics.java | 24 ++-
.../elastic/ElasticSearchJavaRestClient.java | 224 +++++++++++++++++++++
.../io/sinks/ElasticSearch7SinkTester.java | 41 ++++
.../io/sinks/ElasticSearch8SinkTester.java | 43 ++++
.../integration/io/sinks/OpenSearchSinkTester.java | 88 ++++++++
11 files changed, 491 insertions(+), 31 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 305266ce799..0e252cac61b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -767,12 +767,15 @@ public abstract class AdminResource extends
PulsarWebResource {
protected void validatePersistencePolicies(PersistencePolicies
persistence) {
checkNotNull(persistence, "persistence policies should not be null");
final ServiceConfiguration config = pulsar().getConfiguration();
- checkArgument(persistence.getBookkeeperEnsemble() <=
config.getManagedLedgerMaxEnsembleSize(),
- "Bookkeeper-Ensemble must be <= " +
config.getManagedLedgerMaxEnsembleSize());
- checkArgument(persistence.getBookkeeperWriteQuorum() <=
config.getManagedLedgerMaxWriteQuorum(),
- "Bookkeeper-WriteQuorum must be <= " +
config.getManagedLedgerMaxWriteQuorum());
- checkArgument(persistence.getBookkeeperAckQuorum() <=
config.getManagedLedgerMaxAckQuorum(),
- "Bookkeeper-AckQuorum must be <= " +
config.getManagedLedgerMaxAckQuorum());
+ checkArgument(persistence.getBookkeeperEnsemble() <=
config.getManagedLedgerMaxEnsembleSize()
+ && persistence.getBookkeeperEnsemble() > 0,
+ "Bookkeeper-Ensemble must be <= " +
config.getManagedLedgerMaxEnsembleSize() + " and > 0.");
+ checkArgument(persistence.getBookkeeperWriteQuorum() <=
config.getManagedLedgerMaxWriteQuorum()
+ && persistence.getBookkeeperWriteQuorum() > 0,
+ "Bookkeeper-WriteQuorum must be <= " +
config.getManagedLedgerMaxWriteQuorum() + " and > 0.");
+ checkArgument(persistence.getBookkeeperAckQuorum() <=
config.getManagedLedgerMaxAckQuorum()
+ && persistence.getBookkeeperAckQuorum() > 0,
+ "Bookkeeper-AckQuorum must be <= " +
config.getManagedLedgerMaxAckQuorum() + " and > 0.");
checkArgument(
(persistence.getBookkeeperEnsemble() >=
persistence.getBookkeeperWriteQuorum())
&& (persistence.getBookkeeperWriteQuorum() >=
persistence.getBookkeeperAckQuorum()),
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index fdc218a0f05..39fe0727a7f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -1069,6 +1069,18 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
assertEquals(persistence2, persistence1);
}
+ @Test(dataProvider = "invalidPersistentPolicies")
+ public void testSetIncorrectPersistentPolicies(int ensembleSize, int
writeQuorum, int ackQuorum) throws Exception {
+ NamespaceName testNs = this.testLocalNamespaces.get(0);
+ PersistencePolicies persistence1 = new
PersistencePolicies(ensembleSize, writeQuorum, ackQuorum, 0.0);
+ try {
+ namespaces.setPersistence(testNs.getTenant(), testNs.getCluster(),
testNs.getLocalName(), persistence1);
+ fail();
+ } catch (RestException e) {
+ assertEquals(e.getResponse().getStatus(),
Status.BAD_REQUEST.getStatusCode());
+ }
+ }
+
@Test
public void testPersistenceUnauthorized() throws Exception {
try {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index b676eda6746..36f27877360 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -874,6 +874,22 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
consumer.close();
}
+ @Test(dataProvider = "invalidPersistentPolicies")
+ public void testSetIncorrectPersistentPolicies(int ensembleSize, int
writeQuorum, int ackQuorum) throws Exception {
+ admin.topics().createNonPartitionedTopic(persistenceTopic);
+ PersistencePolicies persistence1 = new
PersistencePolicies(ensembleSize, writeQuorum, ackQuorum, 0.0);
+
+ boolean failed = false;
+ try {
+ admin.topicPolicies().setPersistence(persistenceTopic,
persistence1);
+ } catch (PulsarAdminException e) {
+ failed = true;
+ Assert.assertEquals(e.getStatusCode(), 400);
+ }
+ assertTrue(failed);
+ admin.topics().delete(persistenceTopic);
+ }
+
@Test
public void testGetDispatchRateApplied() throws Exception {
final String topic = testTopic + UUID.randomUUID();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index d2b459d91b1..84964680d34 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -69,6 +69,7 @@ import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testng.annotations.DataProvider;
/**
* Base class for all tests that need a Pulsar instance without a ZK and BK
cluster.
@@ -497,5 +498,17 @@ public abstract class MockedPulsarServiceBaseTest extends
TestRetrySupport {
}
}
+ @DataProvider(name = "invalidPersistentPolicies")
+ public Object[][] incorrectPersistentPolicies() {
+ return new Object[][] {
+ {0, 0, 0},
+ {1, 0, 0},
+ {0, 0, 1},
+ {0, 1, 0},
+ {1, 1, 0},
+ {1, 0, 1}
+ };
+ }
+
private static final Logger log =
LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 526b8818ada..7e72117a66f 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1291,26 +1291,33 @@ public class CmdNamespaces extends CmdBase {
private java.util.List<String> params;
@Parameter(names = { "-e",
- "--bookkeeper-ensemble" }, description = "Number of bookies to
use for a topic", required = true)
- private int bookkeeperEnsemble;
+ "--bookkeeper-ensemble" }, description = "Number of bookies to
use for a topic")
+ private int bookkeeperEnsemble = 2;
@Parameter(names = { "-w",
- "--bookkeeper-write-quorum" }, description = "How many writes
to make of each entry", required = true)
- private int bookkeeperWriteQuorum;
+ "--bookkeeper-write-quorum" }, description = "How many writes
to make of each entry")
+ private int bookkeeperWriteQuorum = 2;
@Parameter(names = { "-a",
"--bookkeeper-ack-quorum" },
- description = "Number of acks (guaranteed copies) to wait for
each entry", required = true)
- private int bookkeeperAckQuorum;
+ description = "Number of acks (guaranteed copies) to wait for
each entry")
+ private int bookkeeperAckQuorum = 2;
@Parameter(names = { "-r",
"--ml-mark-delete-max-rate" },
- description = "Throttling rate of mark-delete operation (0
means no throttle)", required = true)
- private double managedLedgerMaxMarkDeleteRate;
+ description = "Throttling rate of mark-delete operation (0
means no throttle)")
+ private double managedLedgerMaxMarkDeleteRate = 0;
@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
+ if (bookkeeperEnsemble <= 0 || bookkeeperWriteQuorum <= 0 ||
bookkeeperAckQuorum <= 0) {
+ throw new ParameterException("[--bookkeeper-ensemble],
[--bookkeeper-write-quorum] "
+ + "and [--bookkeeper-ack-quorum] must greater than
0.");
+ }
+ if (managedLedgerMaxMarkDeleteRate < 0) {
+ throw new ParameterException("[--ml-mark-delete-max-rate]
cannot less than 0.");
+ }
getAdmin().namespaces().setPersistence(namespace, new
PersistencePolicies(bookkeeperEnsemble,
bookkeeperWriteQuorum, bookkeeperAckQuorum,
managedLedgerMaxMarkDeleteRate));
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index ba7d0ff77f0..9bd6c18b4ae 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -1091,20 +1091,20 @@ public class CmdTopicPolicies extends CmdBase {
private java.util.List<String> params;
@Parameter(names = { "-e",
- "--bookkeeper-ensemble" }, description = "Number of bookies to
use for a topic", required = true)
- private int bookkeeperEnsemble;
+ "--bookkeeper-ensemble" }, description = "Number of bookies to
use for a topic")
+ private int bookkeeperEnsemble = 2;
@Parameter(names = { "-w",
- "--bookkeeper-write-quorum" }, description = "How many writes
to make of each entry", required = true)
- private int bookkeeperWriteQuorum;
+ "--bookkeeper-write-quorum" }, description = "How many writes
to make of each entry")
+ private int bookkeeperWriteQuorum = 2;
@Parameter(names = { "-a", "--bookkeeper-ack-quorum" },
- description = "Number of acks (guaranteed copies) to wait for
each entry", required = true)
- private int bookkeeperAckQuorum;
+ description = "Number of acks (guaranteed copies) to wait for
each entry")
+ private int bookkeeperAckQuorum = 2;
@Parameter(names = { "-r", "--ml-mark-delete-max-rate" },
- description = "Throttling rate of mark-delete operation (0
means no throttle)", required = true)
- private double managedLedgerMaxMarkDeleteRate;
+ description = "Throttling rate of mark-delete operation (0
means no throttle)")
+ private double managedLedgerMaxMarkDeleteRate = 0;
@Parameter(names = { "--global", "-g" }, description = "Whether to set
this policy globally. "
+ "If set to true, the policy will be replicate to other
clusters asynchronously", arity = 0)
@@ -1113,6 +1113,13 @@ public class CmdTopicPolicies extends CmdBase {
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
+ if (bookkeeperEnsemble <= 0 || bookkeeperWriteQuorum <= 0 ||
bookkeeperAckQuorum <= 0) {
+ throw new ParameterException("[--bookkeeper-ensemble],
[--bookkeeper-write-quorum] "
+ + "and [--bookkeeper-ack-quorum] must greater than
0.");
+ }
+ if (managedLedgerMaxMarkDeleteRate < 0) {
+ throw new ParameterException("[--ml-mark-delete-max-rate]
cannot less than 0.");
+ }
getTopicPolicies(isGlobal).setPersistence(persistentTopic, new
PersistencePolicies(bookkeeperEnsemble,
bookkeeperWriteQuorum, bookkeeperAckQuorum,
managedLedgerMaxMarkDeleteRate));
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 8fc8d5646d7..056847c7583 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -1988,26 +1988,32 @@ public class CmdTopics extends CmdBase {
private java.util.List<String> params;
@Parameter(names = { "-e",
- "--bookkeeper-ensemble" }, description = "Number of bookies to
use for a topic", required = true)
- private int bookkeeperEnsemble;
+ "--bookkeeper-ensemble" }, description = "Number of bookies to
use for a topic")
+ private int bookkeeperEnsemble = 2;
@Parameter(names = { "-w",
- "--bookkeeper-write-quorum" }, description = "How many writes
to make of each entry", required = true)
- private int bookkeeperWriteQuorum;
+ "--bookkeeper-write-quorum" }, description = "How many writes
to make of each entry")
+ private int bookkeeperWriteQuorum = 2;
@Parameter(names = { "-a",
- "--bookkeeper-ack-quorum" }, description = "Number of acks
(guaranteed copies) to wait for each entry",
- required = true)
- private int bookkeeperAckQuorum;
+ "--bookkeeper-ack-quorum" }, description = "Number of acks
(guaranteed copies) to wait for each entry")
+ private int bookkeeperAckQuorum = 2;
@Parameter(names = { "-r",
"--ml-mark-delete-max-rate" }, description = "Throttling rate
of mark-delete operation "
- + "(0 means no throttle)", required = true)
- private double managedLedgerMaxMarkDeleteRate;
+ + "(0 means no throttle)")
+ private double managedLedgerMaxMarkDeleteRate = 0;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
+ if (bookkeeperEnsemble <= 0 || bookkeeperWriteQuorum <= 0 ||
bookkeeperAckQuorum <= 0) {
+ throw new ParameterException("[--bookkeeper-ensemble],
[--bookkeeper-write-quorum] "
+ + "and [--bookkeeper-ack-quorum] must greater than
0.");
+ }
+ if (managedLedgerMaxMarkDeleteRate < 0) {
+ throw new ParameterException("[--ml-mark-delete-max-rate]
cannot less than 0.");
+ }
getTopics().setPersistence(persistentTopic, new
PersistencePolicies(bookkeeperEnsemble,
bookkeeperWriteQuorum, bookkeeperAckQuorum,
managedLedgerMaxMarkDeleteRate));
}
diff --git
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
new file mode 100644
index 00000000000..e420d7a5cef
--- /dev/null
+++
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.elasticsearch.client.elastic;
+
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch._types.ElasticsearchException;
+import co.elastic.clients.elasticsearch._types.Result;
+import co.elastic.clients.elasticsearch.core.DeleteRequest;
+import co.elastic.clients.elasticsearch.core.DeleteResponse;
+import co.elastic.clients.elasticsearch.core.IndexRequest;
+import co.elastic.clients.elasticsearch.core.IndexResponse;
+import co.elastic.clients.elasticsearch.core.SearchRequest;
+import co.elastic.clients.elasticsearch.core.SearchResponse;
+import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
+import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
+import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest;
+import co.elastic.clients.elasticsearch.indices.ExistsRequest;
+import co.elastic.clients.elasticsearch.indices.IndexSettings;
+import co.elastic.clients.elasticsearch.indices.RefreshRequest;
+import co.elastic.clients.json.jackson.JacksonJsonpMapper;
+import co.elastic.clients.transport.ElasticsearchTransport;
+import co.elastic.clients.transport.rest_client.RestClientTransport;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpHost;
+import org.apache.pulsar.io.elasticsearch.ElasticSearchConfig;
+import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
+import org.apache.pulsar.io.elasticsearch.client.RestClient;
+import org.elasticsearch.client.Node;
+import org.elasticsearch.client.RestClientBuilder;
+import org.opensearch.action.bulk.BulkProcessor;
+
+@Slf4j
+public class ElasticSearchJavaRestClient extends RestClient {
+
+ private final ElasticsearchClient client;
+ private final ObjectMapper objectMapper = new ObjectMapper()
+ .configure(SerializationFeature.INDENT_OUTPUT, false)
+ .setSerializationInclusion(JsonInclude.Include.ALWAYS);
+ private BulkProcessor bulkProcessor;
+ private ElasticsearchTransport transport;
+
+ @VisibleForTesting
+ public void setBulkProcessor(BulkProcessor bulkProcessor) {
+ this.bulkProcessor = bulkProcessor;
+ }
+
+ @VisibleForTesting
+ public void setTransport(ElasticsearchTransport transport) {
+ this.transport = transport;
+ }
+
+ public ElasticSearchJavaRestClient(ElasticSearchConfig elasticSearchConfig,
+ BulkProcessor.Listener
bulkProcessorListener) {
+ super(elasticSearchConfig, bulkProcessorListener);
+
+ log.info("ElasticSearch URL {}", config.getElasticSearchUrl());
+ final HttpHost[] httpHosts = getHttpHosts();
+
+ RestClientBuilder builder =
org.elasticsearch.client.RestClient.builder(httpHosts)
+ .setRequestConfigCallback(builder1 -> builder1
+
.setContentCompressionEnabled(config.isCompressionEnabled())
+
.setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs())
+ .setConnectTimeout(config.getConnectTimeoutInMs())
+ .setSocketTimeout(config.getSocketTimeoutInMs()))
+ .setHttpClientConfigCallback(this.configCallback)
+ .setFailureListener(new
org.elasticsearch.client.RestClient.FailureListener() {
+ public void onFailure(Node node) {
+ log.warn("Node host={} failed", node.getHost());
+ }
+ });
+ transport = new RestClientTransport(builder.build(), new
JacksonJsonpMapper(objectMapper));
+ client = new ElasticsearchClient(transport);
+ if (elasticSearchConfig.isBulkEnabled()) {
+ bulkProcessor = new ElasticBulkProcessor(elasticSearchConfig,
client, bulkProcessorListener);
+ } else {
+ bulkProcessor = null;
+ }
+ }
+
+ @Override
+ public boolean indexExists(String index) throws IOException {
+ final ExistsRequest request = new ExistsRequest.Builder()
+ .index(index)
+ .build();
+ return client.indices().exists(request).value();
+ }
+
+ @Override
+ public boolean createIndex(String index) throws IOException {
+ final CreateIndexRequest createIndexRequest = new
CreateIndexRequest.Builder()
+ .index(index)
+ .settings(new IndexSettings.Builder()
+ .numberOfShards(config.getIndexNumberOfShards() + "")
+ .numberOfReplicas(config.getIndexNumberOfReplicas() +
"")
+ .build()
+ )
+ .build();
+ try {
+ final CreateIndexResponse createIndexResponse =
client.indices().create(createIndexRequest);
+ if ((createIndexResponse.acknowledged())
+ && createIndexResponse.shardsAcknowledged()) {
+ return true;
+ }
+ throw new IOException("Unable to create index, acknowledged: " +
createIndexResponse.acknowledged()
+ + " shardsAcknowledged: " +
createIndexResponse.shardsAcknowledged());
+ } catch (ElasticsearchException ex) {
+ final String errorType =
Objects.requireNonNull(ex.response().error().type());
+ if (errorType.contains("resource_already_exists_exception")) {
+ return false;
+ }
+ throw ex;
+ }
+ }
+
+ @Override
+ public boolean deleteIndex(String index) throws IOException {
+ return client.indices().delete(new
DeleteIndexRequest.Builder().index(index).build()).acknowledged();
+ }
+
+ @Override
+ public boolean deleteDocument(String index, String documentId) throws
IOException {
+ final DeleteRequest req = new
+ DeleteRequest.Builder()
+ .index(config.getIndexName())
+ .id(documentId)
+ .build();
+
+ DeleteResponse deleteResponse = client.delete(req);
+ return deleteResponse.result().equals(Result.Deleted) ||
deleteResponse.result().equals(Result.NotFound);
+ }
+
+ @Override
+ public boolean indexDocument(String index, String documentId, String
documentSource) throws IOException {
+ final Map mapped = objectMapper.readValue(documentSource, Map.class);
+ final IndexRequest<Object> indexRequest = new IndexRequest.Builder<>()
+ .index(config.getIndexName())
+ .document(mapped)
+ .id(documentId)
+ .build();
+ final IndexResponse indexResponse = client.index(indexRequest);
+
+ return indexResponse.result().equals(Result.Created) ||
indexResponse.result().equals(Result.Updated);
+ }
+
+ public SearchResponse<Map> search(String indexName) throws IOException {
+ return search(indexName, "*:*");
+ }
+
+ @VisibleForTesting
+ public SearchResponse<Map> search(String indexName, String query) throws
IOException {
+ final RefreshRequest refreshRequest = new
RefreshRequest.Builder().index(indexName).build();
+ client.indices().refresh(refreshRequest);
+
+ query = query.replace("/", "\\/");
+ return client.search(new SearchRequest.Builder().index(indexName)
+ .q(query)
+ .build(), Map.class);
+ }
+
+ @Override
+ public long totalHits(String indexName) throws IOException {
+ return totalHits(indexName, "*:*");
+ }
+
+ @Override
+ public long totalHits(String indexName, String query) throws IOException {
+ final SearchResponse<Map> searchResponse = search(indexName, query);
+ return searchResponse.hits().total().value();
+ }
+
+ @Override
+ public BulkProcessor getBulkProcessor() {
+ if (bulkProcessor == null) {
+ throw new IllegalStateException("bulkProcessor not enabled");
+ }
+ return bulkProcessor;
+ }
+
+ @Override
+ public void closeClient() {
+ if (bulkProcessor != null) {
+ bulkProcessor.close();
+ }
+ // client doesn't need to be closed, only the transport instance
+ try {
+ transport.close();
+ } catch (IOException e) {
+ log.warn("error while closing the client", e);
+ }
+ }
+
+ @VisibleForTesting
+ public ElasticsearchClient getClient() {
+ return client;
+ }
+
+ @VisibleForTesting
+ public ElasticsearchTransport getTransport() {
+ return transport;
+ }
+}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
new file mode 100644
index 00000000000..65b38c677bf
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io.sinks;
+
+import java.util.Optional;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+public class ElasticSearch7SinkTester extends ElasticSearchSinkTester {
+
+ public static final String ELASTICSEARCH_7 =
Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V7"))
+ .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.17.7");
+
+
+ public ElasticSearch7SinkTester(boolean schemaEnable) {
+ super(schemaEnable);
+ }
+
+ @Override
+ protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
+ return new ElasticsearchContainer(ELASTICSEARCH_7)
+ .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m");
+ }
+
+}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
new file mode 100644
index 00000000000..bb52c4ff03f
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io.sinks;
+
+import java.util.Optional;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+public class ElasticSearch8SinkTester extends ElasticSearchSinkTester {
+
+ public static final String ELASTICSEARCH_8 =
Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V8"))
+ .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.1");
+
+
+ public ElasticSearch8SinkTester(boolean schemaEnable) {
+ super(schemaEnable);
+ }
+
+ @Override
+ protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
+ return new ElasticsearchContainer(ELASTICSEARCH_8)
+ .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m")
+ .withEnv("xpack.security.enabled", "false")
+ .withEnv("xpack.security.http.ssl.enabled", "false");
+ }
+
+}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
new file mode 100644
index 00000000000..1e10cc4189c
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io.sinks;
+
+import java.util.Optional;
+import org.apache.http.HttpHost;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.awaitility.Awaitility;
+import org.opensearch.action.search.SearchRequest;
+import org.opensearch.action.search.SearchResponse;
+import org.opensearch.client.RequestOptions;
+import org.opensearch.client.RestClient;
+import org.opensearch.client.RestClientBuilder;
+import org.opensearch.client.RestHighLevelClient;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.Map;
+
+import static org.testng.Assert.assertTrue;
+
+public class OpenSearchSinkTester extends ElasticSearchSinkTester {
+
+ public static final String OPENSEARCH =
Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE"))
+ .orElse("opensearchproject/opensearch:1.2.4");
+
+ private RestHighLevelClient elasticClient;
+
+
+ public OpenSearchSinkTester(boolean schemaEnable) {
+ super(schemaEnable);
+ }
+
+ @Override
+ protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
+ DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH)
+
.asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
+ return new ElasticsearchContainer(dockerImageName)
+ .withEnv("OPENSEARCH_JAVA_OPTS", "-Xms128m -Xmx256m")
+ .withEnv("bootstrap.memory_lock", "true")
+ .withEnv("plugins.security.disabled", "true");
+ }
+
+ @Override
+ public void prepareSink() throws Exception {
+ RestClientBuilder builder = RestClient.builder(
+ new HttpHost(
+ "localhost",
+ serviceContainer.getMappedPort(9200),
+ "http"));
+ elasticClient = new RestHighLevelClient(builder);
+ }
+
+ @Override
+ public void validateSinkResult(Map<String, String> kvs) {
+ org.opensearch.action.search.SearchRequest searchRequest = new
SearchRequest("test-index");
+
+ Awaitility.await().untilAsserted(() -> {
+ SearchResponse searchResult = elasticClient.search(searchRequest,
RequestOptions.DEFAULT);
+ assertTrue(searchResult.getHits().getTotalHits().value > 0,
searchResult.toString());
+ });
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (elasticClient != null) {
+ elasticClient.close();
+ elasticClient = null;
+ }
+ }
+}