eolivelli commented on a change in pull request #11263:
URL: https://github.com/apache/pulsar/pull/11263#discussion_r673067706



##########
File path: 
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
##########
@@ -0,0 +1,569 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.elasticsearch;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
+import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
+import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.nio.conn.NHttpClientConnectionManager;
+import org.apache.http.nio.conn.NoopIOSessionStrategy;
+import org.apache.http.nio.conn.SchemeIOSessionStrategy;
+import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
+import org.apache.http.nio.reactor.ConnectingIOReactor;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.functions.api.Record;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.*;

Review comment:
       sure. fixed

##########
File path: 
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,143 +18,273 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
-/**
- * The base abstract class for ElasticSearch sinks.
- * Users need to implement extractKeyValue function to use this sink.
- * This class assumes that the input will be JSON documents
- */
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticSearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {
 
-    private URL url;
-    private RestHighLevelClient client;
-    private CredentialsProvider credentialsProvider;
     private ElasticSearchConfig elasticSearchConfig;
+    private ElasticSearchClient elasticsearchClient;
+    private ObjectMapper objectMapper = new ObjectMapper();
+    private List<String> primaryFields = null;
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
         elasticSearchConfig = ElasticSearchConfig.load(config);
         elasticSearchConfig.validate();
-        createIndexIfNeeded();
+        elasticsearchClient = new ElasticSearchClient(elasticSearchConfig);
+        if (!Strings.isNullOrEmpty(elasticSearchConfig.getPrimaryFields())) {
+            primaryFields = 
Arrays.asList(elasticSearchConfig.getPrimaryFields().split(","));
+        }
     }
 
     @Override
-    public void close() throws Exception {
-        client.close();
+    public void close() {
+        if (elasticsearchClient != null) {
+            elasticsearchClient.close();
+            elasticsearchClient = null;
+        }
     }
 
     @Override
-    public void write(Record<byte[]> record) {
-        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
-        IndexRequest indexRequest = 
Requests.indexRequest(elasticSearchConfig.getIndexName());
-        indexRequest.type(elasticSearchConfig.getTypeName());
-        indexRequest.source(keyValue.getValue(), XContentType.JSON);
-
-        try {
-        IndexResponse indexResponse = getClient().index(indexRequest, 
RequestOptions.DEFAULT);
-            if 
(indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
-                record.ack();
-            } else {
-                record.fail();
+    public void write(Record<GenericObject> record) throws Exception {
+        if (!elasticsearchClient.isFailed()) {
+            Pair<String, String> idAndDoc = extractIdAndDocument(record);
+            try {
+                if (log.isDebugEnabled()) {
+                    log.debug("index doc {} {}", idAndDoc.getLeft(), 
idAndDoc.getRight());
+                }
+                if (idAndDoc.getRight() == null) {
+                    switch (elasticSearchConfig.getNullValueAction()) {
+                        case DELETE:
+                            if (idAndDoc.getLeft() != null) {
+                                if (elasticSearchConfig.isBulkEnabled()) {
+                                    elasticsearchClient.bulkDelete(record, 
idAndDoc.getLeft());
+                                } else {
+                                    elasticsearchClient.deleteDocument(record, 
idAndDoc.getLeft());
+                                }
+                            }
+                            break;
+                        case IGNORE:
+                            break;
+                        case FAIL:
+                            elasticsearchClient.failed(new 
PulsarClientException.InvalidMessageException("Unexpected null message value"));

Review comment:
       we are throwing `elasticsearchClient.irrecoverableError.get();`
   isn't this supposed to be managed as a "record.fail()" ? 

##########
File path: 
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,143 +18,273 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
-/**
- * The base abstract class for ElasticSearch sinks.
- * Users need to implement extractKeyValue function to use this sink.
- * This class assumes that the input will be JSON documents
- */
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticSearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {
 
-    private URL url;
-    private RestHighLevelClient client;
-    private CredentialsProvider credentialsProvider;
     private ElasticSearchConfig elasticSearchConfig;
+    private ElasticSearchClient elasticsearchClient;
+    private ObjectMapper objectMapper = new ObjectMapper();
+    private List<String> primaryFields = null;
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
         elasticSearchConfig = ElasticSearchConfig.load(config);
         elasticSearchConfig.validate();
-        createIndexIfNeeded();
+        elasticsearchClient = new ElasticSearchClient(elasticSearchConfig);
+        if (!Strings.isNullOrEmpty(elasticSearchConfig.getPrimaryFields())) {
+            primaryFields = 
Arrays.asList(elasticSearchConfig.getPrimaryFields().split(","));
+        }
     }
 
     @Override
-    public void close() throws Exception {
-        client.close();
+    public void close() {
+        if (elasticsearchClient != null) {
+            elasticsearchClient.close();
+            elasticsearchClient = null;
+        }
     }
 
     @Override
-    public void write(Record<byte[]> record) {
-        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
-        IndexRequest indexRequest = 
Requests.indexRequest(elasticSearchConfig.getIndexName());
-        indexRequest.type(elasticSearchConfig.getTypeName());
-        indexRequest.source(keyValue.getValue(), XContentType.JSON);
-
-        try {
-        IndexResponse indexResponse = getClient().index(indexRequest, 
RequestOptions.DEFAULT);
-            if 
(indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
-                record.ack();
-            } else {
-                record.fail();
+    public void write(Record<GenericObject> record) throws Exception {
+        if (!elasticsearchClient.isFailed()) {
+            Pair<String, String> idAndDoc = extractIdAndDocument(record);
+            try {
+                if (log.isDebugEnabled()) {
+                    log.debug("index doc {} {}", idAndDoc.getLeft(), 
idAndDoc.getRight());
+                }
+                if (idAndDoc.getRight() == null) {
+                    switch (elasticSearchConfig.getNullValueAction()) {
+                        case DELETE:
+                            if (idAndDoc.getLeft() != null) {
+                                if (elasticSearchConfig.isBulkEnabled()) {
+                                    elasticsearchClient.bulkDelete(record, 
idAndDoc.getLeft());
+                                } else {
+                                    elasticsearchClient.deleteDocument(record, 
idAndDoc.getLeft());
+                                }
+                            }
+                            break;
+                        case IGNORE:
+                            break;
+                        case FAIL:
+                            elasticsearchClient.failed(new 
PulsarClientException.InvalidMessageException("Unexpected null message value"));
+                            throw elasticsearchClient.irrecoverableError.get();
+                    }
+                } else {
+                    if (elasticSearchConfig.isBulkEnabled()) {
+                        elasticsearchClient.bulkIndex(record, idAndDoc);
+                    } else {
+                        elasticsearchClient.indexDocument(record, idAndDoc);
+                    }
+                }
+            } catch (JsonProcessingException jsonProcessingException) {
+                switch (elasticSearchConfig.getMalformedDocAction()) {
+                    case IGNORE:
+                        break;
+                    case WARN:
+                        log.warn("Ignoring malformed document messageId={}",
+                                
record.getMessage().map(Message::getMessageId).orElse(null),
+                                jsonProcessingException);
+                        elasticsearchClient.failed(jsonProcessingException);
+                        throw jsonProcessingException;
+                    case FAIL:
+                        log.error("Malformed document messageId={}",
+                                
record.getMessage().map(Message::getMessageId).orElse(null),
+                                jsonProcessingException);
+                        elasticsearchClient.failed(jsonProcessingException);
+                        throw jsonProcessingException;
+                }
+            } catch (Exception e) {
+                log.error("write error for {} {}:", idAndDoc.getLeft(), 
idAndDoc.getRight(), e);
+                throw e;
             }
-        } catch (final IOException ex) {
-            record.fail();
+        } else {
+            throw new IllegalStateException("Elasticsearch client is in FAILED 
status");
         }
     }
 
-    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
-        String key = record.getKey().orElse("");
-        return new KeyValue<>(key, record.getValue());
+    @VisibleForTesting
+    ElasticSearchClient getElasticsearchClient() {
+        return this.elasticsearchClient;
     }
 
-    private void createIndexIfNeeded() throws IOException {
-        GetIndexRequest request = new GetIndexRequest();
-        request.indices(elasticSearchConfig.getIndexName());
-        boolean exists = getClient().indices().exists(request, 
RequestOptions.DEFAULT);
+    /**
+     * Extract ES _id and _source using the Schema if available.
+     *
+     * @param record
+     * @return A pair for _id and _source
+     */
+    public Pair<String, String> extractIdAndDocument(Record<GenericObject> 
record) throws JsonProcessingException {
+        if (elasticSearchConfig.isSchemaEnable()) {
+            Object key = null;
+            GenericObject value = null;
+            Schema<?> keySchema = null;
+            Schema<?> valueSchema = null;
+            if (record.getSchema() != null && record.getSchema() instanceof 
KeyValueSchema) {
+                KeyValueSchema<GenericObject, GenericObject> keyValueSchema = 
(KeyValueSchema) record.getSchema();
+                keySchema = keyValueSchema.getKeySchema();
+                valueSchema = keyValueSchema.getValueSchema();
+                KeyValue<GenericObject, GenericObject> keyValue = 
(KeyValue<GenericObject, GenericObject>) record.getValue().getNativeObject();
+                key = keyValue.getKey();
+                value = keyValue.getValue();
+            } else {
+                key = record.getKey().orElse(null);
+                valueSchema = record.getSchema();
+                value = record.getValue();
+            }
+
+            String id = null;
+            if (elasticSearchConfig.isKeyIgnore() == false && key != null && 
keySchema != null) {
+                id = stringifyKey(keySchema, key);
+            }
 
-        if (!exists) {
-            CreateIndexRequest cireq = new 
CreateIndexRequest(elasticSearchConfig.getIndexName());
+            String doc = null;
+            if (value != null) {
+                if (valueSchema != null) {
+                    doc = stringifyValue(valueSchema, value);
+                } else {
+                    if (value.getNativeObject() instanceof byte[]) {
+                        // for BWC with the ES-Sink
+                        doc = new String((byte[]) value.getNativeObject(), 
StandardCharsets.UTF_8);
+                    } else {
+                        doc = value.getNativeObject().toString();
+                    }
+                }
+            }
 
-            cireq.settings(Settings.builder()
-               .put("index.number_of_shards", 
elasticSearchConfig.getIndexNumberOfShards())
-               .put("index.number_of_replicas", 
elasticSearchConfig.getIndexNumberOfReplicas()));
+            if (doc != null && primaryFields != null) {
+                try {
+                    // extract the PK from the JSON document
+                    JsonNode jsonNode = objectMapper.readTree(doc);
+                    id = stringifyKey(jsonNode, primaryFields);
+                } catch (JsonProcessingException e) {
+                    log.error("Failed to read JSON", e);
+                    throw e;
+                }
+            }
 
-            CreateIndexResponse ciresp = getClient().indices().create(cireq, 
RequestOptions.DEFAULT);
-            if (!ciresp.isAcknowledged() || !ciresp.isShardsAcknowledged()) {
-                throw new RuntimeException("Unable to create index.");
+            if (log.isDebugEnabled()) {
+                SchemaType schemaType = null;
+                if (record.getSchema() != null && 
record.getSchema().getSchemaInfo() != null) {
+                    schemaType = record.getSchema().getSchemaInfo().getType();
+                }
+                log.debug("recordType={} schemaType={} id={} doc={}",
+                        record.getClass().getName(),
+                        schemaType,
+                        id,
+                        doc);
             }
+            return Pair.of(id, doc);
+    } else {
+        return Pair.of(null, new String(

Review comment:
       we must keep the `_id` null here, because with `schemaEnable = false` 
the Sink behaves exactly the same way as in Pulsar 2.8.0, that is without 
handling the primary key of the ES document.
   

##########
File path: 
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
##########
@@ -0,0 +1,569 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.elasticsearch;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
+import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
+import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.nio.conn.NHttpClientConnectionManager;
+import org.apache.http.nio.conn.NoopIOSessionStrategy;
+import org.apache.http.nio.conn.SchemeIOSessionStrategy;
+import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
+import org.apache.http.nio.reactor.ConnectingIOReactor;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.functions.api.Record;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.*;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.CreateIndexResponse;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Slf4j
+public class ElasticSearchClient {
+
+    static final String[] malformedErrors = {
+            "mapper_parsing_exception",
+            "action_request_validation_exception",
+            "illegal_argument_exception"
+    };
+
+    private ElasticSearchConfig config;
+    private ConfigCallback configCallback;
+    private RestHighLevelClient client;
+
+    final Set<String> indexCache = new HashSet<>();
+    final Map<String, String> topicToIndexCache = new HashMap<>();
+
+    final RandomExponentialRetry backoffRetry;
+    final BulkProcessor bulkProcessor;
+    final ConcurrentMap<DocWriteRequest<?>, Record> records = new 
ConcurrentHashMap<>();
+    final AtomicReference<Exception> irrecoverableError = new 
AtomicReference<>();
+    final ScheduledExecutorService executorService;
+
+    ElasticSearchClient(ElasticSearchConfig elasticSearchConfig) throws 
MalformedURLException {
+        this.config = elasticSearchConfig;
+        this.configCallback = new ConfigCallback();
+        this.backoffRetry = new 
RandomExponentialRetry(elasticSearchConfig.getMaxRetryTimeInSec());
+        if (config.isBulkEnabled() == false) {
+            bulkProcessor = null;
+        } else {
+            BulkProcessor.Builder builder = BulkProcessor.builder(
+                    (bulkRequest, bulkResponseActionListener) -> 
client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, 
bulkResponseActionListener),
+                    new BulkProcessor.Listener() {
+                        @Override
+                        public void beforeBulk(long l, BulkRequest 
bulkRequest) {
+                        }
+
+                        @Override
+                        public void afterBulk(long l, BulkRequest bulkRequest, 
BulkResponse bulkResponse) {
+                            log.trace("Bulk request id={} size={}:", l, 
bulkRequest.requests().size());
+                            for (int i = 0; i < 
bulkResponse.getItems().length; i++) {
+                                DocWriteRequest<?> request = 
bulkRequest.requests().get(i);
+                                Record record = records.get(request);
+                                BulkItemResponse bulkItemResponse = 
bulkResponse.getItems()[i];
+                                if (bulkItemResponse.isFailed()) {
+                                    record.fail();
+                                    try {
+                                        
hasIrrecoverableError(bulkItemResponse);
+                                    } catch(Exception e) {
+                                        log.warn("Unrecoverable error:", e);
+                                    }
+                                } else {
+                                    record.ack();
+                                }
+                                records.remove(request);
+                            }
+                        }
+
+                        @Override
+                        public void afterBulk(long l, BulkRequest bulkRequest, 
Throwable throwable) {
+                            log.warn("Bulk request id={} failed:", l, 
throwable);
+                            for (DocWriteRequest<?> request : 
bulkRequest.requests()) {
+                                Record record = records.remove(request);
+                                record.fail();
+                            }
+                        }
+                    }
+            )
+                    .setBulkActions(config.getBulkActions())
+                    .setBulkSize(new ByteSizeValue(config.getBulkSizeInMb(), 
ByteSizeUnit.MB))
+                    .setConcurrentRequests(config.getBulkConcurrentRequests())
+                    .setBackoffPolicy(new 
RandomExponentialBackoffPolicy(backoffRetry,
+                            config.getRetryBackoffInMs(),
+                            config.getMaxRetries()
+                    ));
+            if (config.getBulkFlushIntervalInMs() > 0) {
+                builder.setFlushInterval(new 
TimeValue(config.getBulkFlushIntervalInMs(), TimeUnit.MILLISECONDS));
+            }
+            this.bulkProcessor = builder.build();
+        }
+
+        // idle+expired connection evictor thread
+        this.executorService = Executors.newSingleThreadScheduledExecutor();
+        this.executorService.scheduleAtFixedRate(new Runnable() {
+                                                     @Override
+                                                     public void run() {
+                                                         
configCallback.connectionManager.closeExpiredConnections();
+                                                         
configCallback.connectionManager.closeIdleConnections(
+                                                                 
config.getConnectionIdleTimeoutInMs(), TimeUnit.MILLISECONDS);
+                                                     }
+                                                 },
+                config.getConnectionIdleTimeoutInMs(),
+                config.getConnectionIdleTimeoutInMs(),
+                TimeUnit.MILLISECONDS
+        );
+
+        URL url = new URL(config.getElasticSearchUrl());
+        log.info("ElasticSearch URL {}", url);
+        RestClientBuilder builder = RestClient.builder(new 
HttpHost(url.getHost(), url.getPort(), url.getProtocol()))
+                .setRequestConfigCallback(new 
RestClientBuilder.RequestConfigCallback() {
+                    @Override
+                    public RequestConfig.Builder 
customizeRequestConfig(RequestConfig.Builder builder) {
+                        return builder
+                                
.setContentCompressionEnabled(config.isCompressionEnabled())
+                                
.setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs())
+                                
.setConnectTimeout(config.getConnectTimeoutInMs())
+                                
.setSocketTimeout(config.getSocketTimeoutInMs());
+                    }
+                })
+                .setHttpClientConfigCallback(this.configCallback)
+                .setFailureListener(new RestClient.FailureListener() {
+                    public void onFailure(Node node) {
+                        log.warn("Node host={} failed", node.getHost());
+                    }
+                });
+        this.client = new RestHighLevelClient(builder);
+    }
+
+    void failed(Exception e) throws Exception {
+        if (irrecoverableError.compareAndSet(null, e)) {
+            log.error("Irrecoverable error:", e);
+        }
+    }
+
+    boolean isFailed() {
+        return irrecoverableError.get() != null;
+    }
+
+    void hasIrrecoverableError(BulkItemResponse bulkItemResponse) throws 
Exception {
+        for (String error : malformedErrors) {
+            if (bulkItemResponse.getFailureMessage().contains(error)) {
+                switch (config.getMalformedDocAction()) {
+                    case IGNORE:
+                        break;
+                    case WARN:
+                        log.warn("Ignoring malformed document index={} id={}",
+                                bulkItemResponse.getIndex(),
+                                bulkItemResponse.getId(),
+                                bulkItemResponse.getFailure().getCause());
+                        break;
+                    case FAIL:
+                        log.error("Failure due to the malformed document 
index={} id={}",
+                                bulkItemResponse.getIndex(),
+                                bulkItemResponse.getId(),
+                                bulkItemResponse.getFailure().getCause());
+                        failed(bulkItemResponse.getFailure().getCause());
+                        break;
+                }
+            }
+        }
+    }
+
+    public void bulkIndex(Record record, Pair<String, String> idAndDoc) throws 
Exception {
+        try {
+            checkNotFailed();
+            checkIndexExists(record.getTopicName());
+            IndexRequest indexRequest = 
Requests.indexRequest(config.getIndexName());
+            if (!Strings.isNullOrEmpty(idAndDoc.getLeft()))
+                indexRequest.id(idAndDoc.getLeft());
+            indexRequest.type(config.getTypeName());
+            indexRequest.source(idAndDoc.getRight(), XContentType.JSON);
+
+            records.put(indexRequest, record);
+            bulkProcessor.add(indexRequest);
+        } catch(Exception e) {
+            log.debug("index failed id=" + idAndDoc.getLeft(), e);
+            record.fail();
+            throw e;
+        }
+    }
+
+    /**
+     * Index an elasticsearch document and ack the record.
+     * @param record
+     * @param idAndDoc
+     * @return
+     * @throws Exception
+     */
+    public boolean indexDocument(Record<GenericObject> record, Pair<String, 
String> idAndDoc) throws Exception {
+        try {
+            checkNotFailed();
+            checkIndexExists(record.getTopicName());
+            IndexRequest indexRequest = 
Requests.indexRequest(config.getIndexName());
+            if (!Strings.isNullOrEmpty(idAndDoc.getLeft()))
+                indexRequest.id(idAndDoc.getLeft());
+            indexRequest.type(config.getTypeName());
+            indexRequest.source(idAndDoc.getRight(), XContentType.JSON);
+            IndexResponse indexResponse = client.index(indexRequest, 
RequestOptions.DEFAULT);
+            if 
(indexResponse.getResult().equals(DocWriteResponse.Result.CREATED) ||
+                    
indexResponse.getResult().equals(DocWriteResponse.Result.UPDATED)) {
+                record.ack();
+                return true;
+            } else {
+                record.fail();
+                return false;
+            }
+        } catch (final Exception ex) {
+            log.error("index failed id=" + idAndDoc.getLeft(), ex);
+            record.fail();
+            throw ex;
+        }
+    }
+
+    public void bulkDelete(Record<GenericObject> record, String id) throws 
Exception {
+        try {
+            checkNotFailed();
+            checkIndexExists(record.getTopicName());
+            DeleteRequest deleteRequest = 
Requests.deleteRequest(config.getIndexName());
+            deleteRequest.id(id);
+            deleteRequest.type(config.getTypeName());
+
+            records.put(deleteRequest, record);
+            bulkProcessor.add(deleteRequest);
+        } catch(Exception e) {
+            log.debug("delete failed id=" + id, e);
+            record.fail();
+            throw e;
+        }
+    }
+
+    /**
+     * Delete an elasticsearch document and ack the record.
+     * @param record
+     * @param id
+     * @return
+     * @throws IOException
+     */
+    public boolean deleteDocument(Record<GenericObject> record, String id) 
throws Exception {
+        try {
+            checkNotFailed();
+            checkIndexExists(record.getTopicName());
+            DeleteRequest deleteRequest = 
Requests.deleteRequest(config.getIndexName());
+            deleteRequest.id(id);
+            deleteRequest.type(config.getTypeName());
+            DeleteResponse deleteResponse = client.delete(deleteRequest, 
RequestOptions.DEFAULT);
+            log.debug("delete result=" + deleteResponse.getResult());
+            if 
(deleteResponse.getResult().equals(DocWriteResponse.Result.DELETED) ||
+                    
deleteResponse.getResult().equals(DocWriteResponse.Result.NOT_FOUND)) {
+                record.ack();
+                return true;
+            }
+            record.fail();
+            return false;
+        } catch (final Exception ex) {
+            log.debug("index failed id=" + id, ex);
+            record.fail();
+            throw ex;
+        }
+    }
+
+    /**
+     * Flushes the bulk processor.
+     */
+    public void flush() {
+        bulkProcessor.flush();
+    }
+
+    public void close() {
+        try {
+            if (bulkProcessor != null) {
+                bulkProcessor.awaitClose(5000L, TimeUnit.MILLISECONDS);
+            }
+        } catch (InterruptedException e) {
+            log.warn("Elasticsearch bulk processor close error:", e);
+        }
+        try {
+            this.executorService.shutdown();
+            if (this.client != null) {
+                this.client.close();
+            }
+        } catch (IOException e) {
+            log.warn("Elasticsearch client close error:", e);
+        }
+    }
+
+    private void checkNotFailed() throws Exception {
+        if (irrecoverableError.get() != null) {
+            throw irrecoverableError.get();
+        }
+    }
+
+    private void checkIndexExists(Optional<String> topicName) throws 
IOException {
+        if (!config.isCreateIndexIfNeeded()) {
+            return;
+        }
+        String indexName = indexName(topicName);
+        if (!indexCache.contains(indexName)) {
+            synchronized (this) {
+                if (!indexCache.contains(indexName)) {
+                    createIndexIfNeeded(indexName);
+                    indexCache.add(indexName);
+                }
+            }
+        }
+    }
+
+    private String indexName(Optional<String> topicName) throws IOException {
+        if (config.getIndexName() != null) {
+            // Use the configured indexName if provided.
+            return config.getIndexName();
+        }
+        if (!topicName.isPresent()) {
+            throw new IOException("Elasticsearch index name configuration and 
topic name are empty");
+        }
+        return topicToIndexName(topicName.get());
+    }
+
+    @VisibleForTesting
+    public String topicToIndexName(String topicName) {
+        return topicToIndexCache.computeIfAbsent(topicName, k -> {
+            // see elasticsearch limitations 
https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html#indices-create-api-path-params
+            String indexName = topicName.toLowerCase(Locale.ROOT);
+
+            // remove the pulsar topic info persistent://tenant/namespace/topic
+            String[] parts = indexName.split("/");
+            if (parts.length > 1) {
+                indexName = parts[parts.length-1];
+            }
+
+            // truncate to the max bytes length
+            while (indexName.getBytes(StandardCharsets.UTF_8).length > 255) {
+                indexName = indexName.substring(0, indexName.length() - 1);
+            }
+            if (indexName.length() <= 0 || 
!indexName.matches("[a-zA-Z\\.0-9][a-zA-Z_\\.\\-\\+0-9]*")) {
+                throw new RuntimeException(new IOException("Cannot convert the 
topic name='" + topicName + "' to a valid elasticsearch index name"));
+            }
+            if (log.isDebugEnabled()) {
+                log.debug("Translate topic={} to index={}", k, indexName);
+            }
+            return indexName;
+        });
+    }
+
+    @VisibleForTesting
+    public boolean createIndexIfNeeded(String indexName) throws IOException {
+        if (indexExists(indexName)) {
+            return false;
+        }
+        final CreateIndexRequest cireq = new CreateIndexRequest(indexName);
+        cireq.settings(Settings.builder()
+                .put("index.number_of_shards", config.getIndexNumberOfShards())
+                .put("index.number_of_replicas", 
config.getIndexNumberOfReplicas()));
+        return retry(() -> {
+            CreateIndexResponse resp = client.indices().create(cireq, 
RequestOptions.DEFAULT);
+            if (!resp.isAcknowledged() || !resp.isShardsAcknowledged()) {
+                throw new IOException("Unable to create index.");
+            }
+            return true;
+        }, "create index");
+    }
+
+    public boolean indexExists(final String indexName) throws IOException {
+        final GetIndexRequest request = new GetIndexRequest(indexName);
+        return retry(() -> client.indices().exists(request, 
RequestOptions.DEFAULT), "index exists");
+    }
+
+    @VisibleForTesting
+    protected long totalHits(String indexName) throws IOException {
+        client.indices().refresh(new RefreshRequest(indexName), 
RequestOptions.DEFAULT);
+        SearchResponse response =  client.search(
+                new SearchRequest()
+                        .indices(indexName)
+                        .source(new 
SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
+                RequestOptions.DEFAULT);
+        for(SearchHit searchHit : response.getHits()) {
+            System.out.println(searchHit.getId()+": "+searchHit.getFields());

Review comment:
       correct (this is only for the tests, but it is better to use the logger 
consistently)

##########
File path: 
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
##########
@@ -0,0 +1,569 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.elasticsearch;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
+import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
+import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.nio.conn.NHttpClientConnectionManager;
+import org.apache.http.nio.conn.NoopIOSessionStrategy;
+import org.apache.http.nio.conn.SchemeIOSessionStrategy;
+import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
+import org.apache.http.nio.reactor.ConnectingIOReactor;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.functions.api.Record;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.*;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.CreateIndexResponse;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Slf4j
+public class ElasticSearchClient {
+
+    static final String[] malformedErrors = {
+            "mapper_parsing_exception",
+            "action_request_validation_exception",
+            "illegal_argument_exception"
+    };
+
+    private ElasticSearchConfig config;
+    private ConfigCallback configCallback;
+    private RestHighLevelClient client;
+
+    final Set<String> indexCache = new HashSet<>();
+    final Map<String, String> topicToIndexCache = new HashMap<>();
+
+    final RandomExponentialRetry backoffRetry;
+    final BulkProcessor bulkProcessor;
+    final ConcurrentMap<DocWriteRequest<?>, Record> records = new 
ConcurrentHashMap<>();
+    final AtomicReference<Exception> irrecoverableError = new 
AtomicReference<>();
+    final ScheduledExecutorService executorService;
+
+    ElasticSearchClient(ElasticSearchConfig elasticSearchConfig) throws 
MalformedURLException {
+        this.config = elasticSearchConfig;
+        this.configCallback = new ConfigCallback();
+        this.backoffRetry = new 
RandomExponentialRetry(elasticSearchConfig.getMaxRetryTimeInSec());
+        if (config.isBulkEnabled() == false) {
+            bulkProcessor = null;
+        } else {
+            BulkProcessor.Builder builder = BulkProcessor.builder(
+                    (bulkRequest, bulkResponseActionListener) -> 
client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, 
bulkResponseActionListener),
+                    new BulkProcessor.Listener() {
+                        @Override
+                        public void beforeBulk(long l, BulkRequest 
bulkRequest) {
+                        }
+
+                        @Override
+                        public void afterBulk(long l, BulkRequest bulkRequest, 
BulkResponse bulkResponse) {
+                            log.trace("Bulk request id={} size={}:", l, 
bulkRequest.requests().size());
+                            for (int i = 0; i < 
bulkResponse.getItems().length; i++) {
+                                DocWriteRequest<?> request = 
bulkRequest.requests().get(i);
+                                Record record = records.get(request);
+                                BulkItemResponse bulkItemResponse = 
bulkResponse.getItems()[i];
+                                if (bulkItemResponse.isFailed()) {
+                                    record.fail();
+                                    try {
+                                        
hasIrrecoverableError(bulkItemResponse);
+                                    } catch(Exception e) {
+                                        log.warn("Unrecoverable error:", e);
+                                    }
+                                } else {
+                                    record.ack();
+                                }
+                                records.remove(request);
+                            }
+                        }
+
+                        @Override
+                        public void afterBulk(long l, BulkRequest bulkRequest, 
Throwable throwable) {
+                            log.warn("Bulk request id={} failed:", l, 
throwable);
+                            for (DocWriteRequest<?> request : 
bulkRequest.requests()) {
+                                Record record = records.remove(request);
+                                record.fail();
+                            }
+                        }
+                    }
+            )
+                    .setBulkActions(config.getBulkActions())
+                    .setBulkSize(new ByteSizeValue(config.getBulkSizeInMb(), 
ByteSizeUnit.MB))
+                    .setConcurrentRequests(config.getBulkConcurrentRequests())
+                    .setBackoffPolicy(new 
RandomExponentialBackoffPolicy(backoffRetry,
+                            config.getRetryBackoffInMs(),
+                            config.getMaxRetries()
+                    ));
+            if (config.getBulkFlushIntervalInMs() > 0) {
+                builder.setFlushInterval(new 
TimeValue(config.getBulkFlushIntervalInMs(), TimeUnit.MILLISECONDS));
+            }
+            this.bulkProcessor = builder.build();
+        }
+
+        // idle+expired connection evictor thread
+        this.executorService = Executors.newSingleThreadScheduledExecutor();
+        this.executorService.scheduleAtFixedRate(new Runnable() {
+                                                     @Override
+                                                     public void run() {
+                                                         
configCallback.connectionManager.closeExpiredConnections();
+                                                         
configCallback.connectionManager.closeIdleConnections(
+                                                                 
config.getConnectionIdleTimeoutInMs(), TimeUnit.MILLISECONDS);
+                                                     }
+                                                 },
+                config.getConnectionIdleTimeoutInMs(),
+                config.getConnectionIdleTimeoutInMs(),
+                TimeUnit.MILLISECONDS
+        );
+
+        URL url = new URL(config.getElasticSearchUrl());
+        log.info("ElasticSearch URL {}", url);
+        RestClientBuilder builder = RestClient.builder(new 
HttpHost(url.getHost(), url.getPort(), url.getProtocol()))
+                .setRequestConfigCallback(new 
RestClientBuilder.RequestConfigCallback() {
+                    @Override
+                    public RequestConfig.Builder 
customizeRequestConfig(RequestConfig.Builder builder) {
+                        return builder
+                                
.setContentCompressionEnabled(config.isCompressionEnabled())
+                                
.setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs())
+                                
.setConnectTimeout(config.getConnectTimeoutInMs())
+                                
.setSocketTimeout(config.getSocketTimeoutInMs());
+                    }
+                })
+                .setHttpClientConfigCallback(this.configCallback)
+                .setFailureListener(new RestClient.FailureListener() {
+                    public void onFailure(Node node) {
+                        log.warn("Node host={} failed", node.getHost());
+                    }
+                });
+        this.client = new RestHighLevelClient(builder);
+    }
+
+    void failed(Exception e) throws Exception {
+        if (irrecoverableError.compareAndSet(null, e)) {
+            log.error("Irrecoverable error:", e);
+        }
+    }
+
+    boolean isFailed() {
+        return irrecoverableError.get() != null;
+    }
+
+    void hasIrrecoverableError(BulkItemResponse bulkItemResponse) throws 
Exception {
+        for (String error : malformedErrors) {
+            if (bulkItemResponse.getFailureMessage().contains(error)) {
+                switch (config.getMalformedDocAction()) {
+                    case IGNORE:
+                        break;
+                    case WARN:
+                        log.warn("Ignoring malformed document index={} id={}",
+                                bulkItemResponse.getIndex(),
+                                bulkItemResponse.getId(),
+                                bulkItemResponse.getFailure().getCause());
+                        break;
+                    case FAIL:
+                        log.error("Failure due to the malformed document 
index={} id={}",
+                                bulkItemResponse.getIndex(),
+                                bulkItemResponse.getId(),
+                                bulkItemResponse.getFailure().getCause());
+                        failed(bulkItemResponse.getFailure().getCause());
+                        break;
+                }
+            }
+        }
+    }
+
+    public void bulkIndex(Record record, Pair<String, String> idAndDoc) throws 
Exception {
+        try {
+            checkNotFailed();
+            checkIndexExists(record.getTopicName());
+            IndexRequest indexRequest = 
Requests.indexRequest(config.getIndexName());
+            if (!Strings.isNullOrEmpty(idAndDoc.getLeft()))
+                indexRequest.id(idAndDoc.getLeft());
+            indexRequest.type(config.getTypeName());
+            indexRequest.source(idAndDoc.getRight(), XContentType.JSON);
+
+            records.put(indexRequest, record);
+            bulkProcessor.add(indexRequest);
+        } catch(Exception e) {
+            log.debug("index failed id=" + idAndDoc.getLeft(), e);
+            record.fail();
+            throw e;
+        }
+    }
+
+    /**
+     * Index an elasticsearch document and ack the record.
+     * @param record
+     * @param idAndDoc
+     * @return
+     * @throws Exception
+     */
+    public boolean indexDocument(Record<GenericObject> record, Pair<String, 
String> idAndDoc) throws Exception {
+        try {
+            checkNotFailed();
+            checkIndexExists(record.getTopicName());
+            IndexRequest indexRequest = 
Requests.indexRequest(config.getIndexName());
+            if (!Strings.isNullOrEmpty(idAndDoc.getLeft()))
+                indexRequest.id(idAndDoc.getLeft());
+            indexRequest.type(config.getTypeName());
+            indexRequest.source(idAndDoc.getRight(), XContentType.JSON);
+            IndexResponse indexResponse = client.index(indexRequest, 
RequestOptions.DEFAULT);
+            if 
(indexResponse.getResult().equals(DocWriteResponse.Result.CREATED) ||
+                    
indexResponse.getResult().equals(DocWriteResponse.Result.UPDATED)) {
+                record.ack();
+                return true;
+            } else {
+                record.fail();
+                return false;
+            }
+        } catch (final Exception ex) {
+            log.error("index failed id=" + idAndDoc.getLeft(), ex);
+            record.fail();
+            throw ex;
+        }
+    }
+
+    public void bulkDelete(Record<GenericObject> record, String id) throws 
Exception {
+        try {
+            checkNotFailed();
+            checkIndexExists(record.getTopicName());
+            DeleteRequest deleteRequest = 
Requests.deleteRequest(config.getIndexName());
+            deleteRequest.id(id);
+            deleteRequest.type(config.getTypeName());
+
+            records.put(deleteRequest, record);
+            bulkProcessor.add(deleteRequest);
+        } catch(Exception e) {
+            log.debug("delete failed id=" + id, e);
+            record.fail();
+            throw e;
+        }
+    }
+
+    /**
+     * Delete an elasticsearch document and ack the record.
+     * @param record
+     * @param id
+     * @return
+     * @throws IOException
+     */
+    public boolean deleteDocument(Record<GenericObject> record, String id) 
throws Exception {
+        try {
+            checkNotFailed();
+            checkIndexExists(record.getTopicName());
+            DeleteRequest deleteRequest = 
Requests.deleteRequest(config.getIndexName());
+            deleteRequest.id(id);
+            deleteRequest.type(config.getTypeName());
+            DeleteResponse deleteResponse = client.delete(deleteRequest, 
RequestOptions.DEFAULT);
+            log.debug("delete result=" + deleteResponse.getResult());
+            if 
(deleteResponse.getResult().equals(DocWriteResponse.Result.DELETED) ||
+                    
deleteResponse.getResult().equals(DocWriteResponse.Result.NOT_FOUND)) {
+                record.ack();
+                return true;
+            }
+            record.fail();
+            return false;
+        } catch (final Exception ex) {
+            log.debug("index failed id=" + id, ex);
+            record.fail();
+            throw ex;
+        }
+    }
+
+    /**
+     * Flushes the bulk processor.
+     */
+    public void flush() {
+        bulkProcessor.flush();
+    }
+
+    public void close() {
+        try {
+            if (bulkProcessor != null) {
+                bulkProcessor.awaitClose(5000L, TimeUnit.MILLISECONDS);
+            }
+        } catch (InterruptedException e) {
+            log.warn("Elasticsearch bulk processor close error:", e);
+        }
+        try {
+            this.executorService.shutdown();
+            if (this.client != null) {
+                this.client.close();
+            }
+        } catch (IOException e) {
+            log.warn("Elasticsearch client close error:", e);
+        }
+    }
+
+    private void checkNotFailed() throws Exception {
+        if (irrecoverableError.get() != null) {
+            throw irrecoverableError.get();
+        }
+    }
+
+    private void checkIndexExists(Optional<String> topicName) throws 
IOException {
+        if (!config.isCreateIndexIfNeeded()) {
+            return;
+        }
+        String indexName = indexName(topicName);
+        if (!indexCache.contains(indexName)) {
+            synchronized (this) {
+                if (!indexCache.contains(indexName)) {
+                    createIndexIfNeeded(indexName);
+                    indexCache.add(indexName);
+                }
+            }
+        }
+    }
+
+    private String indexName(Optional<String> topicName) throws IOException {
+        if (config.getIndexName() != null) {
+            // Use the configured indexName if provided.
+            return config.getIndexName();
+        }
+        if (!topicName.isPresent()) {
+            throw new IOException("Elasticsearch index name configuration and 
topic name are empty");
+        }
+        return topicToIndexName(topicName.get());
+    }
+
+    @VisibleForTesting
+    public String topicToIndexName(String topicName) {
+        return topicToIndexCache.computeIfAbsent(topicName, k -> {
+            // see elasticsearch limitations 
https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html#indices-create-api-path-params
+            String indexName = topicName.toLowerCase(Locale.ROOT);
+
+            // remove the pulsar topic info persistent://tenant/namespace/topic
+            String[] parts = indexName.split("/");
+            if (parts.length > 1) {
+                indexName = parts[parts.length-1];
+            }
+
+            // truncate to the max bytes length
+            while (indexName.getBytes(StandardCharsets.UTF_8).length > 255) {
+                indexName = indexName.substring(0, indexName.length() - 1);
+            }
+            if (indexName.length() <= 0 || 
!indexName.matches("[a-zA-Z\\.0-9][a-zA-Z_\\.\\-\\+0-9]*")) {
+                throw new RuntimeException(new IOException("Cannot convert the 
topic name='" + topicName + "' to a valid elasticsearch index name"));
+            }
+            if (log.isDebugEnabled()) {
+                log.debug("Translate topic={} to index={}", k, indexName);
+            }
+            return indexName;
+        });
+    }
+
+    @VisibleForTesting
+    public boolean createIndexIfNeeded(String indexName) throws IOException {
+        if (indexExists(indexName)) {
+            return false;
+        }
+        final CreateIndexRequest cireq = new CreateIndexRequest(indexName);
+        cireq.settings(Settings.builder()
+                .put("index.number_of_shards", config.getIndexNumberOfShards())
+                .put("index.number_of_replicas", 
config.getIndexNumberOfReplicas()));
+        return retry(() -> {
+            CreateIndexResponse resp = client.indices().create(cireq, 
RequestOptions.DEFAULT);
+            if (!resp.isAcknowledged() || !resp.isShardsAcknowledged()) {
+                throw new IOException("Unable to create index.");
+            }
+            return true;
+        }, "create index");
+    }
+
+    public boolean indexExists(final String indexName) throws IOException {
+        final GetIndexRequest request = new GetIndexRequest(indexName);
+        return retry(() -> client.indices().exists(request, 
RequestOptions.DEFAULT), "index exists");
+    }
+
+    @VisibleForTesting
+    protected long totalHits(String indexName) throws IOException {

Review comment:
       good idea. btw this method is only for the tests




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to