This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 46fcb237c8 [hotfix][connector-elasticsearch-sink] Convert index to
lowercase (#8429)
46fcb237c8 is described below
commit 46fcb237c8862bcf16bb302358d77db6b752119f
Author: Jast <[email protected]>
AuthorDate: Fri Jan 3 10:18:23 2025 +0800
[hotfix][connector-elasticsearch-sink] Convert index to lowercase (#8429)
---
.../elasticsearch/client/EsRestClient.java | 12 ++--
.../sink/ElasticsearchSinkWriter.java | 3 +-
.../connector/elasticsearch/ElasticsearchIT.java | 33 ++++++++++
...rce_to_elasticsearch_with_upper_case_index.conf | 71 ++++++++++++++++++++++
4 files changed, 113 insertions(+), 6 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index 279d480e92..87702bd80a 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -388,7 +388,7 @@ public class EsRestClient implements Closeable {
* @return true or false
*/
public boolean checkIndexExist(String index) {
- Request request = new Request("HEAD", "/" + index);
+ Request request = new Request("HEAD", "/" + index.toLowerCase());
try {
Response response = restClient.performRequest(request);
int statusCode = response.getStatusLine().getStatusCode();
@@ -400,7 +400,9 @@ public class EsRestClient implements Closeable {
}
public List<IndexDocsCount> getIndexDocsCount(String index) {
- String endpoint =
String.format("/_cat/indices/%s?h=index,docsCount&format=json", index);
+ String endpoint =
+ String.format(
+ "/_cat/indices/%s?h=index,docsCount&format=json",
index.toLowerCase());
Request request = new Request("GET", endpoint);
try {
Response response = restClient.performRequest(request);
@@ -458,7 +460,7 @@ public class EsRestClient implements Closeable {
}
public void createIndex(String indexName, String mapping) {
- String endpoint = String.format("/%s", indexName);
+ String endpoint = String.format("/%s", indexName.toLowerCase());
Request request = new Request("PUT", endpoint);
if (StringUtils.isNotEmpty(mapping)) {
request.setJsonEntity(mapping);
@@ -484,7 +486,7 @@ public class EsRestClient implements Closeable {
}
public void dropIndex(String tableName) {
- String endpoint = String.format("/%s", tableName);
+ String endpoint = String.format("/%s", tableName.toLowerCase());
Request request = new Request("DELETE", endpoint);
try {
Response response = restClient.performRequest(request);
@@ -510,7 +512,7 @@ public class EsRestClient implements Closeable {
}
public void clearIndexData(String indexName) {
- String endpoint = String.format("/%s/_delete_by_query", indexName);
+ String endpoint = String.format("/%s/_delete_by_query",
indexName.toLowerCase());
Request request = new Request("POST", endpoint);
String jsonString = "{ \"query\": { \"match_all\": {} } }";
request.setJsonEntity(jsonString);
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
index 8cb054ca0a..06c6e40f38 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
@@ -70,7 +70,8 @@ public class ElasticsearchSinkWriter
this.context = context;
this.maxBatchSize = maxBatchSize;
- IndexInfo indexInfo = new
IndexInfo(catalogTable.getTableId().getTableName(), config);
+ IndexInfo indexInfo =
+ new
IndexInfo(catalogTable.getTableId().getTableName().toLowerCase(), config);
esRestClient = EsRestClient.createInstance(config);
this.seaTunnelRowSerializer =
new ElasticsearchRowSerializer(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index ca05a4b118..87730fee46 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -42,6 +42,7 @@ import org.apache.seatunnel.e2e.common.util.ContainerUtil;
import org.apache.commons.io.IOUtils;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -75,6 +76,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Function;
@@ -352,6 +354,37 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
esRestClient.getIndexDocsCount("st_index_full_type_target").get(0).getDocsCount());
}
+ @TestTemplate
+ public void testFakeSourceToElasticsearchWithUpperCaseIndex(TestContainer
container) {
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ Container.ExecResult execResult =
+ container.executeJob(
+
"/elasticsearch/fakesource_to_elasticsearch_with_upper_case_index.conf");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+ Awaitility.await()
+ .atMost(120, TimeUnit.SECONDS)
+ .ignoreExceptions()
+ .pollInterval(3, TimeUnit.SECONDS)
+ .pollDelay(10, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertEquals(
+ 20,
+ esRestClient
+ .getIndexDocsCount("st_fake_table")
+ .get(0)
+ .getDocsCount());
+ });
+ }
+
@TestTemplate
public void testElasticsearchWithoutSchema(TestContainer container)
throws IOException, InterruptedException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/fakesource_to_elasticsearch_with_upper_case_index.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/fakesource_to_elasticsearch_with_upper_case_index.conf
new file mode 100644
index 0000000000..dc557703c4
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/fakesource_to_elasticsearch_with_upper_case_index.conf
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+}
+
+source {
+ FakeSource {
+ plugin_output = "fake"
+ row.num = 20
+ schema {
+ table = "FakeDatabase.FAKE_TABLE"
+ columns = [
+ {
+ name = id
+ type = bigint
+ nullable = false
+ comment = "primary key id"
+ },
+ {
+ name = name
+ type = "string"
+ comment = "name"
+ },
+ {
+ name = age
+ type = int
+ comment = "age"
+ }
+ ]
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ Elasticsearch {
+ hosts = ["https://elasticsearch:9200"]
+ username = "elastic"
+ password = "elasticsearch"
+ tls_verify_certificate = false
+ tls_verify_hostname = false
+
+ index = "st_${table_name}"
+ index_type = "_doc"
+ "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
+ "data_save_mode"="APPEND_DATA"
+ }
+}