This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 67d2c29d382 [fix][admin] Fix `validatePersistencePolicies` that 
Namespace/Topic persistent policies cannot set to < 0  (#18999)
67d2c29d382 is described below

commit 67d2c29d3821ad2cdf6b26d3132861d32757412a
Author: Tao Jiuming <[email protected]>
AuthorDate: Sat Jan 28 11:12:24 2023 +0800

    [fix][admin] Fix `validatePersistencePolicies` that Namespace/Topic 
persistent policies cannot set to < 0  (#18999)
    
    (cherry picked from commit 4cae20ce9e0bd6be9843893abff2866a03a4556f)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  15 +-
 .../apache/pulsar/broker/admin/NamespacesTest.java |  12 ++
 .../pulsar/broker/admin/TopicPoliciesTest.java     |  16 ++
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  13 ++
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  23 ++-
 .../apache/pulsar/admin/cli/CmdTopicPolicies.java  |  23 ++-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  24 ++-
 .../elastic/ElasticSearchJavaRestClient.java       | 224 +++++++++++++++++++++
 .../io/sinks/ElasticSearch7SinkTester.java         |  41 ++++
 .../io/sinks/ElasticSearch8SinkTester.java         |  43 ++++
 .../integration/io/sinks/OpenSearchSinkTester.java |  88 ++++++++
 11 files changed, 491 insertions(+), 31 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 305266ce799..0e252cac61b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -767,12 +767,15 @@ public abstract class AdminResource extends 
PulsarWebResource {
     protected void validatePersistencePolicies(PersistencePolicies 
persistence) {
         checkNotNull(persistence, "persistence policies should not be null");
         final ServiceConfiguration config = pulsar().getConfiguration();
-        checkArgument(persistence.getBookkeeperEnsemble() <= 
config.getManagedLedgerMaxEnsembleSize(),
-                "Bookkeeper-Ensemble must be <= " + 
config.getManagedLedgerMaxEnsembleSize());
-        checkArgument(persistence.getBookkeeperWriteQuorum() <= 
config.getManagedLedgerMaxWriteQuorum(),
-                "Bookkeeper-WriteQuorum must be <= " + 
config.getManagedLedgerMaxWriteQuorum());
-        checkArgument(persistence.getBookkeeperAckQuorum() <= 
config.getManagedLedgerMaxAckQuorum(),
-                "Bookkeeper-AckQuorum must be <= " + 
config.getManagedLedgerMaxAckQuorum());
+        checkArgument(persistence.getBookkeeperEnsemble() <= 
config.getManagedLedgerMaxEnsembleSize()
+                        && persistence.getBookkeeperEnsemble() > 0,
+                "Bookkeeper-Ensemble must be <= " + 
config.getManagedLedgerMaxEnsembleSize() + " and > 0.");
+        checkArgument(persistence.getBookkeeperWriteQuorum() <= 
config.getManagedLedgerMaxWriteQuorum()
+                        && persistence.getBookkeeperWriteQuorum() > 0,
+                "Bookkeeper-WriteQuorum must be <= " + 
config.getManagedLedgerMaxWriteQuorum() + " and > 0.");
+        checkArgument(persistence.getBookkeeperAckQuorum() <= 
config.getManagedLedgerMaxAckQuorum()
+                        && persistence.getBookkeeperAckQuorum() > 0,
+                "Bookkeeper-AckQuorum must be <= " + 
config.getManagedLedgerMaxAckQuorum() + " and > 0.");
         checkArgument(
                 (persistence.getBookkeeperEnsemble() >= 
persistence.getBookkeeperWriteQuorum())
                         && (persistence.getBookkeeperWriteQuorum() >= 
persistence.getBookkeeperAckQuorum()),
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index fdc218a0f05..39fe0727a7f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -1069,6 +1069,18 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(persistence2, persistence1);
     }
 
+    @Test(dataProvider = "invalidPersistentPolicies")
+    public void testSetIncorrectPersistentPolicies(int ensembleSize, int 
writeQuorum, int ackQuorum) throws Exception {
+        NamespaceName testNs = this.testLocalNamespaces.get(0);
+        PersistencePolicies persistence1 = new 
PersistencePolicies(ensembleSize, writeQuorum, ackQuorum, 0.0);
+        try {
+            namespaces.setPersistence(testNs.getTenant(), testNs.getCluster(), 
testNs.getLocalName(), persistence1);
+            fail();
+        } catch (RestException e) {
+            assertEquals(e.getResponse().getStatus(), 
Status.BAD_REQUEST.getStatusCode());
+        }
+    }
+
     @Test
     public void testPersistenceUnauthorized() throws Exception {
         try {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index b676eda6746..36f27877360 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -874,6 +874,22 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         consumer.close();
     }
 
+    @Test(dataProvider = "invalidPersistentPolicies")
+    public void testSetIncorrectPersistentPolicies(int ensembleSize, int 
writeQuorum, int ackQuorum) throws Exception {
+        admin.topics().createNonPartitionedTopic(persistenceTopic);
+        PersistencePolicies persistence1 = new 
PersistencePolicies(ensembleSize, writeQuorum, ackQuorum, 0.0);
+
+        boolean failed = false;
+        try {
+            admin.topicPolicies().setPersistence(persistenceTopic, 
persistence1);
+        } catch (PulsarAdminException e) {
+            failed = true;
+            Assert.assertEquals(e.getStatusCode(), 400);
+        }
+        assertTrue(failed);
+        admin.topics().delete(persistenceTopic);
+    }
+
     @Test
     public void testGetDispatchRateApplied() throws Exception {
         final String topic = testTopic + UUID.randomUUID();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index d2b459d91b1..84964680d34 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -69,6 +69,7 @@ import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.annotations.DataProvider;
 
 /**
  * Base class for all tests that need a Pulsar instance without a ZK and BK 
cluster.
@@ -497,5 +498,17 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
         }
     }
 
+    @DataProvider(name = "invalidPersistentPolicies")
+    public Object[][] incorrectPersistentPolicies() {
+        return new Object[][] {
+                {0, 0, 0},
+                {1, 0, 0},
+                {0, 0, 1},
+                {0, 1, 0},
+                {1, 1, 0},
+                {1, 0, 1}
+        };
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
 }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 526b8818ada..7e72117a66f 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1291,26 +1291,33 @@ public class CmdNamespaces extends CmdBase {
         private java.util.List<String> params;
 
         @Parameter(names = { "-e",
-                "--bookkeeper-ensemble" }, description = "Number of bookies to 
use for a topic", required = true)
-        private int bookkeeperEnsemble;
+                "--bookkeeper-ensemble" }, description = "Number of bookies to 
use for a topic")
+        private int bookkeeperEnsemble = 2;
 
         @Parameter(names = { "-w",
-                "--bookkeeper-write-quorum" }, description = "How many writes 
to make of each entry", required = true)
-        private int bookkeeperWriteQuorum;
+                "--bookkeeper-write-quorum" }, description = "How many writes 
to make of each entry")
+        private int bookkeeperWriteQuorum = 2;
 
         @Parameter(names = { "-a",
                 "--bookkeeper-ack-quorum" },
-                description = "Number of acks (guaranteed copies) to wait for 
each entry", required = true)
-        private int bookkeeperAckQuorum;
+                description = "Number of acks (guaranteed copies) to wait for 
each entry")
+        private int bookkeeperAckQuorum = 2;
 
         @Parameter(names = { "-r",
                 "--ml-mark-delete-max-rate" },
-                description = "Throttling rate of mark-delete operation (0 
means no throttle)", required = true)
-        private double managedLedgerMaxMarkDeleteRate;
+                description = "Throttling rate of mark-delete operation (0 
means no throttle)")
+        private double managedLedgerMaxMarkDeleteRate = 0;
 
         @Override
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(params);
+            if (bookkeeperEnsemble <= 0 || bookkeeperWriteQuorum <= 0 || 
bookkeeperAckQuorum <= 0) {
+                throw new ParameterException("[--bookkeeper-ensemble], 
[--bookkeeper-write-quorum] "
+                        + "and [--bookkeeper-ack-quorum] must greater than 
0.");
+            }
+            if (managedLedgerMaxMarkDeleteRate < 0) {
+                throw new ParameterException("[--ml-mark-delete-max-rate] 
cannot less than 0.");
+            }
             getAdmin().namespaces().setPersistence(namespace, new 
PersistencePolicies(bookkeeperEnsemble,
                     bookkeeperWriteQuorum, bookkeeperAckQuorum, 
managedLedgerMaxMarkDeleteRate));
         }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index ba7d0ff77f0..9bd6c18b4ae 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -1091,20 +1091,20 @@ public class CmdTopicPolicies extends CmdBase {
         private java.util.List<String> params;
 
         @Parameter(names = { "-e",
-                "--bookkeeper-ensemble" }, description = "Number of bookies to 
use for a topic", required = true)
-        private int bookkeeperEnsemble;
+                "--bookkeeper-ensemble" }, description = "Number of bookies to 
use for a topic")
+        private int bookkeeperEnsemble = 2;
 
         @Parameter(names = { "-w",
-                "--bookkeeper-write-quorum" }, description = "How many writes 
to make of each entry", required = true)
-        private int bookkeeperWriteQuorum;
+                "--bookkeeper-write-quorum" }, description = "How many writes 
to make of each entry")
+        private int bookkeeperWriteQuorum = 2;
 
         @Parameter(names = { "-a", "--bookkeeper-ack-quorum" },
-                description = "Number of acks (guaranteed copies) to wait for 
each entry", required = true)
-        private int bookkeeperAckQuorum;
+                description = "Number of acks (guaranteed copies) to wait for 
each entry")
+        private int bookkeeperAckQuorum = 2;
 
         @Parameter(names = { "-r", "--ml-mark-delete-max-rate" },
-                description = "Throttling rate of mark-delete operation (0 
means no throttle)", required = true)
-        private double managedLedgerMaxMarkDeleteRate;
+                description = "Throttling rate of mark-delete operation (0 
means no throttle)")
+        private double managedLedgerMaxMarkDeleteRate = 0;
 
         @Parameter(names = { "--global", "-g" }, description = "Whether to set 
this policy globally. "
                 + "If set to true, the policy will be replicate to other 
clusters asynchronously", arity = 0)
@@ -1113,6 +1113,13 @@ public class CmdTopicPolicies extends CmdBase {
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(params);
+            if (bookkeeperEnsemble <= 0 || bookkeeperWriteQuorum <= 0 || 
bookkeeperAckQuorum <= 0) {
+                throw new ParameterException("[--bookkeeper-ensemble], 
[--bookkeeper-write-quorum] "
+                        + "and [--bookkeeper-ack-quorum] must greater than 
0.");
+            }
+            if (managedLedgerMaxMarkDeleteRate < 0) {
+                throw new ParameterException("[--ml-mark-delete-max-rate] 
cannot less than 0.");
+            }
             getTopicPolicies(isGlobal).setPersistence(persistentTopic, new 
PersistencePolicies(bookkeeperEnsemble,
                     bookkeeperWriteQuorum, bookkeeperAckQuorum, 
managedLedgerMaxMarkDeleteRate));
         }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 8fc8d5646d7..056847c7583 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -1988,26 +1988,32 @@ public class CmdTopics extends CmdBase {
         private java.util.List<String> params;
 
         @Parameter(names = { "-e",
-                "--bookkeeper-ensemble" }, description = "Number of bookies to 
use for a topic", required = true)
-        private int bookkeeperEnsemble;
+                "--bookkeeper-ensemble" }, description = "Number of bookies to 
use for a topic")
+        private int bookkeeperEnsemble = 2;
 
         @Parameter(names = { "-w",
-                "--bookkeeper-write-quorum" }, description = "How many writes 
to make of each entry", required = true)
-        private int bookkeeperWriteQuorum;
+                "--bookkeeper-write-quorum" }, description = "How many writes 
to make of each entry")
+        private int bookkeeperWriteQuorum = 2;
 
         @Parameter(names = { "-a",
-                "--bookkeeper-ack-quorum" }, description = "Number of acks 
(guaranteed copies) to wait for each entry",
-                required = true)
-        private int bookkeeperAckQuorum;
+                "--bookkeeper-ack-quorum" }, description = "Number of acks 
(guaranteed copies) to wait for each entry")
+        private int bookkeeperAckQuorum = 2;
 
         @Parameter(names = { "-r",
                 "--ml-mark-delete-max-rate" }, description = "Throttling rate 
of mark-delete operation "
-                + "(0 means no throttle)", required = true)
-        private double managedLedgerMaxMarkDeleteRate;
+                + "(0 means no throttle)")
+        private double managedLedgerMaxMarkDeleteRate = 0;
 
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(params);
+            if (bookkeeperEnsemble <= 0 || bookkeeperWriteQuorum <= 0 || 
bookkeeperAckQuorum <= 0) {
+                throw new ParameterException("[--bookkeeper-ensemble], 
[--bookkeeper-write-quorum] "
+                        + "and [--bookkeeper-ack-quorum] must greater than 
0.");
+            }
+            if (managedLedgerMaxMarkDeleteRate < 0) {
+                throw new ParameterException("[--ml-mark-delete-max-rate] 
cannot less than 0.");
+            }
             getTopics().setPersistence(persistentTopic, new 
PersistencePolicies(bookkeeperEnsemble,
                     bookkeeperWriteQuorum, bookkeeperAckQuorum, 
managedLedgerMaxMarkDeleteRate));
         }
diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
new file mode 100644
index 00000000000..e420d7a5cef
--- /dev/null
+++ 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.elasticsearch.client.elastic;
+
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch._types.ElasticsearchException;
+import co.elastic.clients.elasticsearch._types.Result;
+import co.elastic.clients.elasticsearch.core.DeleteRequest;
+import co.elastic.clients.elasticsearch.core.DeleteResponse;
+import co.elastic.clients.elasticsearch.core.IndexRequest;
+import co.elastic.clients.elasticsearch.core.IndexResponse;
+import co.elastic.clients.elasticsearch.core.SearchRequest;
+import co.elastic.clients.elasticsearch.core.SearchResponse;
+import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
+import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
+import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest;
+import co.elastic.clients.elasticsearch.indices.ExistsRequest;
+import co.elastic.clients.elasticsearch.indices.IndexSettings;
+import co.elastic.clients.elasticsearch.indices.RefreshRequest;
+import co.elastic.clients.json.jackson.JacksonJsonpMapper;
+import co.elastic.clients.transport.ElasticsearchTransport;
+import co.elastic.clients.transport.rest_client.RestClientTransport;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpHost;
+import org.apache.pulsar.io.elasticsearch.ElasticSearchConfig;
+import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
+import org.apache.pulsar.io.elasticsearch.client.RestClient;
+import org.elasticsearch.client.Node;
+import org.elasticsearch.client.RestClientBuilder;
+import org.opensearch.action.bulk.BulkProcessor;
+
+@Slf4j
+public class ElasticSearchJavaRestClient extends RestClient {
+
+    private final ElasticsearchClient client;
+    private final ObjectMapper objectMapper = new ObjectMapper()
+            .configure(SerializationFeature.INDENT_OUTPUT, false)
+            .setSerializationInclusion(JsonInclude.Include.ALWAYS);
+    private BulkProcessor bulkProcessor;
+    private ElasticsearchTransport transport;
+
+    @VisibleForTesting
+    public void setBulkProcessor(BulkProcessor bulkProcessor) {
+        this.bulkProcessor = bulkProcessor;
+    }
+
+    @VisibleForTesting
+    public void setTransport(ElasticsearchTransport transport) {
+        this.transport = transport;
+    }
+
+    public ElasticSearchJavaRestClient(ElasticSearchConfig elasticSearchConfig,
+                                       BulkProcessor.Listener 
bulkProcessorListener) {
+        super(elasticSearchConfig, bulkProcessorListener);
+
+        log.info("ElasticSearch URL {}", config.getElasticSearchUrl());
+        final HttpHost[] httpHosts = getHttpHosts();
+
+        RestClientBuilder builder = 
org.elasticsearch.client.RestClient.builder(httpHosts)
+                .setRequestConfigCallback(builder1 -> builder1
+                        
.setContentCompressionEnabled(config.isCompressionEnabled())
+                        
.setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs())
+                        .setConnectTimeout(config.getConnectTimeoutInMs())
+                        .setSocketTimeout(config.getSocketTimeoutInMs()))
+                .setHttpClientConfigCallback(this.configCallback)
+                .setFailureListener(new 
org.elasticsearch.client.RestClient.FailureListener() {
+                    public void onFailure(Node node) {
+                        log.warn("Node host={} failed", node.getHost());
+                    }
+                });
+        transport = new RestClientTransport(builder.build(), new 
JacksonJsonpMapper(objectMapper));
+        client = new ElasticsearchClient(transport);
+        if (elasticSearchConfig.isBulkEnabled()) {
+            bulkProcessor = new ElasticBulkProcessor(elasticSearchConfig, 
client, bulkProcessorListener);
+        } else {
+            bulkProcessor = null;
+        }
+    }
+
+    @Override
+    public boolean indexExists(String index) throws IOException {
+        final ExistsRequest request = new ExistsRequest.Builder()
+                .index(index)
+                .build();
+        return client.indices().exists(request).value();
+    }
+
+    @Override
+    public boolean createIndex(String index) throws IOException {
+        final CreateIndexRequest createIndexRequest = new 
CreateIndexRequest.Builder()
+                .index(index)
+                .settings(new IndexSettings.Builder()
+                        .numberOfShards(config.getIndexNumberOfShards() + "")
+                        .numberOfReplicas(config.getIndexNumberOfReplicas() + 
"")
+                        .build()
+                )
+                .build();
+        try {
+            final CreateIndexResponse createIndexResponse = 
client.indices().create(createIndexRequest);
+            if ((createIndexResponse.acknowledged())
+                    && createIndexResponse.shardsAcknowledged()) {
+                return true;
+            }
+            throw new IOException("Unable to create index, acknowledged: " + 
createIndexResponse.acknowledged()
+                    + " shardsAcknowledged: " + 
createIndexResponse.shardsAcknowledged());
+        } catch (ElasticsearchException ex) {
+            final String errorType = 
Objects.requireNonNull(ex.response().error().type());
+            if (errorType.contains("resource_already_exists_exception")) {
+                return false;
+            }
+            throw ex;
+        }
+    }
+
+    @Override
+    public boolean deleteIndex(String index) throws IOException {
+        return client.indices().delete(new 
DeleteIndexRequest.Builder().index(index).build()).acknowledged();
+    }
+
+    @Override
+    public boolean deleteDocument(String index, String documentId) throws 
IOException {
+        final DeleteRequest req = new
+                DeleteRequest.Builder()
+                .index(config.getIndexName())
+                .id(documentId)
+                .build();
+
+        DeleteResponse deleteResponse = client.delete(req);
+        return deleteResponse.result().equals(Result.Deleted) || 
deleteResponse.result().equals(Result.NotFound);
+    }
+
+    @Override
+    public boolean indexDocument(String index, String documentId, String 
documentSource) throws IOException {
+        final Map mapped = objectMapper.readValue(documentSource, Map.class);
+        final IndexRequest<Object> indexRequest = new IndexRequest.Builder<>()
+                .index(config.getIndexName())
+                .document(mapped)
+                .id(documentId)
+                .build();
+        final IndexResponse indexResponse = client.index(indexRequest);
+
+        return indexResponse.result().equals(Result.Created) || 
indexResponse.result().equals(Result.Updated);
+    }
+
+    public SearchResponse<Map> search(String indexName) throws IOException {
+        return search(indexName, "*:*");
+    }
+
+    @VisibleForTesting
+    public SearchResponse<Map> search(String indexName, String query) throws 
IOException {
+        final RefreshRequest refreshRequest = new 
RefreshRequest.Builder().index(indexName).build();
+        client.indices().refresh(refreshRequest);
+
+        query = query.replace("/", "\\/");
+        return client.search(new SearchRequest.Builder().index(indexName)
+                .q(query)
+                .build(), Map.class);
+    }
+
+    @Override
+    public long totalHits(String indexName) throws IOException {
+        return totalHits(indexName, "*:*");
+    }
+
+    @Override
+    public long totalHits(String indexName, String query) throws IOException {
+        final SearchResponse<Map> searchResponse = search(indexName, query);
+        return searchResponse.hits().total().value();
+    }
+
+    @Override
+    public BulkProcessor getBulkProcessor() {
+        if (bulkProcessor == null) {
+            throw new IllegalStateException("bulkProcessor not enabled");
+        }
+        return bulkProcessor;
+    }
+
+    @Override
+    public void closeClient() {
+        if (bulkProcessor != null) {
+            bulkProcessor.close();
+        }
+        // client doesn't need to be closed, only the transport instance
+        try {
+            transport.close();
+        } catch (IOException e) {
+            log.warn("error while closing the client", e);
+        }
+    }
+
+    @VisibleForTesting
+    public ElasticsearchClient getClient() {
+        return client;
+    }
+
+    @VisibleForTesting
+    public ElasticsearchTransport getTransport() {
+        return transport;
+    }
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
new file mode 100644
index 00000000000..65b38c677bf
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io.sinks;
+
+import java.util.Optional;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+public class ElasticSearch7SinkTester extends ElasticSearchSinkTester {
+
+    public static final String ELASTICSEARCH_7 = 
Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V7"))
+            .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.17.7");
+
+
+    public ElasticSearch7SinkTester(boolean schemaEnable) {
+        super(schemaEnable);
+    }
+
+    @Override
+    protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
+        return new ElasticsearchContainer(ELASTICSEARCH_7)
+                .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m");
+    }
+
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
new file mode 100644
index 00000000000..bb52c4ff03f
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io.sinks;
+
+import java.util.Optional;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+public class ElasticSearch8SinkTester extends ElasticSearchSinkTester {
+
+    public static final String ELASTICSEARCH_8 = 
Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V8"))
+            .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.1");
+
+
+    public ElasticSearch8SinkTester(boolean schemaEnable) {
+        super(schemaEnable);
+    }
+
+    @Override
+    protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
+        return new ElasticsearchContainer(ELASTICSEARCH_8)
+                .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m")
+                .withEnv("xpack.security.enabled", "false")
+                .withEnv("xpack.security.http.ssl.enabled", "false");
+    }
+
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
new file mode 100644
index 00000000000..1e10cc4189c
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io.sinks;
+
+import java.util.Optional;
+import org.apache.http.HttpHost;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.awaitility.Awaitility;
+import org.opensearch.action.search.SearchRequest;
+import org.opensearch.action.search.SearchResponse;
+import org.opensearch.client.RequestOptions;
+import org.opensearch.client.RestClient;
+import org.opensearch.client.RestClientBuilder;
+import org.opensearch.client.RestHighLevelClient;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.Map;
+
+import static org.testng.Assert.assertTrue;
+
+public class OpenSearchSinkTester extends ElasticSearchSinkTester {
+
+    public static final String OPENSEARCH = 
Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE"))
+            .orElse("opensearchproject/opensearch:1.2.4");
+
+    private RestHighLevelClient elasticClient;
+
+
+    public OpenSearchSinkTester(boolean schemaEnable) {
+        super(schemaEnable);
+    }
+
+    @Override
+    protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
+        DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH)
+                
.asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
+        return new ElasticsearchContainer(dockerImageName)
+                .withEnv("OPENSEARCH_JAVA_OPTS", "-Xms128m -Xmx256m")
+                .withEnv("bootstrap.memory_lock", "true")
+                .withEnv("plugins.security.disabled", "true");
+    }
+
+    @Override
+    public void prepareSink() throws Exception {
+        RestClientBuilder builder = RestClient.builder(
+                new HttpHost(
+                        "localhost",
+                        serviceContainer.getMappedPort(9200),
+                        "http"));
+        elasticClient = new RestHighLevelClient(builder);
+    }
+
+    @Override
+    public void validateSinkResult(Map<String, String> kvs) {
+        org.opensearch.action.search.SearchRequest searchRequest = new 
SearchRequest("test-index");
+
+        Awaitility.await().untilAsserted(() -> {
+            SearchResponse searchResult = elasticClient.search(searchRequest, 
RequestOptions.DEFAULT);
+            assertTrue(searchResult.getHits().getTotalHits().value > 0, 
searchResult.toString());
+        });
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (elasticClient != null) {
+            elasticClient.close();
+            elasticClient = null;
+        }
+    }
+}

Reply via email to