This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 46fcb237c8 [hotfix][connector-elasticsearch-sink] Convert index to 
lowercase  (#8429)
46fcb237c8 is described below

commit 46fcb237c8862bcf16bb302358d77db6b752119f
Author: Jast <[email protected]>
AuthorDate: Fri Jan 3 10:18:23 2025 +0800

    [hotfix][connector-elasticsearch-sink] Convert index to lowercase  (#8429)
---
 .../elasticsearch/client/EsRestClient.java         | 12 ++--
 .../sink/ElasticsearchSinkWriter.java              |  3 +-
 .../connector/elasticsearch/ElasticsearchIT.java   | 33 ++++++++++
 ...rce_to_elasticsearch_with_upper_case_index.conf | 71 ++++++++++++++++++++++
 4 files changed, 113 insertions(+), 6 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index 279d480e92..87702bd80a 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -388,7 +388,7 @@ public class EsRestClient implements Closeable {
      * @return true or false
      */
     public boolean checkIndexExist(String index) {
-        Request request = new Request("HEAD", "/" + index);
+        Request request = new Request("HEAD", "/" + index.toLowerCase());
         try {
             Response response = restClient.performRequest(request);
             int statusCode = response.getStatusLine().getStatusCode();
@@ -400,7 +400,9 @@ public class EsRestClient implements Closeable {
     }
 
     public List<IndexDocsCount> getIndexDocsCount(String index) {
-        String endpoint = 
String.format("/_cat/indices/%s?h=index,docsCount&format=json", index);
+        String endpoint =
+                String.format(
+                        "/_cat/indices/%s?h=index,docsCount&format=json", 
index.toLowerCase());
         Request request = new Request("GET", endpoint);
         try {
             Response response = restClient.performRequest(request);
@@ -458,7 +460,7 @@ public class EsRestClient implements Closeable {
     }
 
     public void createIndex(String indexName, String mapping) {
-        String endpoint = String.format("/%s", indexName);
+        String endpoint = String.format("/%s", indexName.toLowerCase());
         Request request = new Request("PUT", endpoint);
         if (StringUtils.isNotEmpty(mapping)) {
             request.setJsonEntity(mapping);
@@ -484,7 +486,7 @@ public class EsRestClient implements Closeable {
     }
 
     public void dropIndex(String tableName) {
-        String endpoint = String.format("/%s", tableName);
+        String endpoint = String.format("/%s", tableName.toLowerCase());
         Request request = new Request("DELETE", endpoint);
         try {
             Response response = restClient.performRequest(request);
@@ -510,7 +512,7 @@ public class EsRestClient implements Closeable {
     }
 
     public void clearIndexData(String indexName) {
-        String endpoint = String.format("/%s/_delete_by_query", indexName);
+        String endpoint = String.format("/%s/_delete_by_query", 
indexName.toLowerCase());
         Request request = new Request("POST", endpoint);
         String jsonString = "{ \"query\": { \"match_all\": {} } }";
         request.setJsonEntity(jsonString);
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
index 8cb054ca0a..06c6e40f38 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
@@ -70,7 +70,8 @@ public class ElasticsearchSinkWriter
         this.context = context;
         this.maxBatchSize = maxBatchSize;
 
-        IndexInfo indexInfo = new 
IndexInfo(catalogTable.getTableId().getTableName(), config);
+        IndexInfo indexInfo =
+                new 
IndexInfo(catalogTable.getTableId().getTableName().toLowerCase(), config);
         esRestClient = EsRestClient.createInstance(config);
         this.seaTunnelRowSerializer =
                 new ElasticsearchRowSerializer(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index ca05a4b118..87730fee46 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -42,6 +42,7 @@ import org.apache.seatunnel.e2e.common.util.ContainerUtil;
 
 import org.apache.commons.io.IOUtils;
 
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -75,6 +76,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.LockSupport;
 import java.util.function.Function;
@@ -352,6 +354,37 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
                 
esRestClient.getIndexDocsCount("st_index_full_type_target").get(0).getDocsCount());
     }
 
+    @TestTemplate
+    public void testFakeSourceToElasticsearchWithUpperCaseIndex(TestContainer 
container) {
+        CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        Container.ExecResult execResult =
+                                container.executeJob(
+                                        
"/elasticsearch/fakesource_to_elasticsearch_with_upper_case_index.conf");
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                    return null;
+                });
+        Awaitility.await()
+                .atMost(120, TimeUnit.SECONDS)
+                .ignoreExceptions()
+                .pollInterval(3, TimeUnit.SECONDS)
+                .pollDelay(10, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertEquals(
+                                    20,
+                                    esRestClient
+                                            .getIndexDocsCount("st_fake_table")
+                                            .get(0)
+                                            .getDocsCount());
+                        });
+    }
+
     @TestTemplate
     public void testElasticsearchWithoutSchema(TestContainer container)
             throws IOException, InterruptedException {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/fakesource_to_elasticsearch_with_upper_case_index.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/fakesource_to_elasticsearch_with_upper_case_index.conf
new file mode 100644
index 0000000000..dc557703c4
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/fakesource_to_elasticsearch_with_upper_case_index.conf
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+}
+
+source {
+    FakeSource {
+        plugin_output = "fake"
+        row.num = 20
+        schema {
+        table = "FakeDatabase.FAKE_TABLE"
+        columns = [
+                {
+                    name = id
+                    type = bigint
+                    nullable = false
+                    comment = "primary key id"
+                },
+                {
+                    name = name
+                    type = "string"
+                    comment = "name"
+                },
+                {
+                    name = age
+                    type = int
+                    comment = "age"
+                }
+            ]
+        }
+    }
+}
+
+transform {
+}
+
+sink {
+  Elasticsearch {
+    hosts = ["https://elasticsearch:9200";]
+    username = "elastic"
+    password = "elasticsearch"
+    tls_verify_certificate = false
+    tls_verify_hostname = false
+
+    index = "st_${table_name}"
+    index_type = "_doc"
+    "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
+    "data_save_mode"="APPEND_DATA"
+  }
+}

Reply via email to