This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 45653e1d22 [Feature][Elasticsearch] Support multi-table sink write
#7041 (#7052)
45653e1d22 is described below
commit 45653e1d2259b21efbf49fd38e20f38080608a97
Author: CosmosNi <[email protected]>
AuthorDate: Mon Jun 24 21:38:41 2024 +0800
[Feature][Elasticsearch] Support multi-table sink write #7041 (#7052)
---
.../elasticsearch/sink/ElasticsearchSink.java | 2 +
.../sink/ElasticsearchSinkFactory.java | 55 +++++++++++--
.../sink/ElasticsearchSinkWriter.java | 8 +-
.../connector/elasticsearch/ElasticsearchIT.java | 78 +++++++++++++++++-
.../fakesource_to_elasticsearch_multi_sink.conf | 95 ++++++++++++++++++++++
5 files changed, 226 insertions(+), 12 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
index d2ca6045eb..6325a14e99 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -47,6 +48,7 @@ public class ElasticsearchSink
ElasticsearchSinkState,
ElasticsearchCommitInfo,
ElasticsearchAggregatedCommitInfo>,
+ SupportMultiTableSink,
SupportSaveMode {
private ReadonlyConfig config;
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
index ad2c01e47e..63770dd1d7 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
@@ -17,7 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkReplaceNameConstant;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.connector.TableSink;
@@ -28,6 +30,9 @@ import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig
import com.google.auto.service.AutoService;
+import java.util.HashMap;
+import java.util.Map;
+
import static
org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.HOSTS;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.PASSWORD;
@@ -75,11 +80,14 @@ public class ElasticsearchSinkFactory implements
TableSinkFactory {
@Override
public TableSink createSink(TableSinkFactoryContext context) {
- String original = context.getOptions().get(INDEX);
- original =
- original.replace(
- REPLACE_TABLE_NAME_KEY,
- context.getCatalogTable().getTableId().getTableName());
+ ReadonlyConfig readonlyConfig = context.getOptions();
+ CatalogTable catalogTable = context.getCatalogTable();
+
+ ReadonlyConfig finalReadonlyConfig =
+ generateCurrentReadonlyConfig(readonlyConfig, catalogTable);
+
+ String original = finalReadonlyConfig.get(INDEX);
+
CatalogTable newTable =
CatalogTable.of(
TableIdentifier.of(
@@ -87,6 +95,41 @@ public class ElasticsearchSinkFactory implements
TableSinkFactory {
context.getCatalogTable().getTablePath().getDatabaseName(),
original),
context.getCatalogTable());
- return () -> new ElasticsearchSink(context.getOptions(), newTable);
+ return () -> new ElasticsearchSink(finalReadonlyConfig, newTable);
+ }
+
+ private ReadonlyConfig generateCurrentReadonlyConfig(
+ ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
+
+ Map<String, String> configMap = readonlyConfig.toMap();
+
+ readonlyConfig
+ .getOptional(INDEX)
+ .ifPresent(
+ tableName -> {
+ String replacedPath =
+ replaceCatalogTableInPath(tableName,
catalogTable);
+ configMap.put(INDEX.key(), replacedPath);
+ });
+
+ return ReadonlyConfig.fromMap(new HashMap<>(configMap));
+ }
+
+ private String replaceCatalogTableInPath(String originTableName,
CatalogTable catalogTable) {
+ String tableName = originTableName;
+ TableIdentifier tableIdentifier = catalogTable.getTableId();
+ if (tableIdentifier != null) {
+ if (tableIdentifier.getSchemaName() != null) {
+ tableName =
+ tableName.replace(
+
SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY,
+ tableIdentifier.getSchemaName());
+ }
+ if (tableIdentifier.getTableName() != null) {
+ tableName =
+ tableName.replace(REPLACE_TABLE_NAME_KEY,
tableIdentifier.getTableName());
+ }
+ }
+ return tableName;
}
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
index 6edac760c1..8cb054ca0a 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -47,9 +48,10 @@ import java.util.Optional;
*/
@Slf4j
public class ElasticsearchSinkWriter
- implements SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo,
ElasticsearchSinkState> {
+ implements SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo,
ElasticsearchSinkState>,
+ SupportMultiTableSinkWriter<Void> {
- private final SinkWriter.Context context;
+ private final Context context;
private final int maxBatchSize;
@@ -60,7 +62,7 @@ public class ElasticsearchSinkWriter
private static final long DEFAULT_SLEEP_TIME_MS = 200L;
public ElasticsearchSinkWriter(
- SinkWriter.Context context,
+ Context context,
CatalogTable catalogTable,
ReadonlyConfig config,
int maxBatchSize,
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index b754ea425a..3180f386b2 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -32,7 +32,9 @@ import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
import org.apache.commons.io.IOUtils;
@@ -50,6 +52,7 @@ import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.DockerLoggerFactory;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
@@ -176,11 +179,54 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
Container.ExecResult execResult =
container.executeJob("/elasticsearch/elasticsearch_source_and_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode());
- List<String> sinkData = readSinkData();
+ List<String> sinkData = readSinkData("st_index2");
// for DSL is: {"range":{"c_int":{"gte":10,"lte":20}}}
Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
}
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK/FLINK do not support multiple
table read")
+ @TestTemplate
+ public void testElasticsearchWithMultiSink(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+
container.executeJob("/elasticsearch/fakesource_to_elasticsearch_multi_sink.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ List<String> source5 =
+ Lists.newArrayList(
+ "id",
+ "c_bool",
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_decimal",
+ "c_string");
+ List<String> source6 =
+ Lists.newArrayList(
+ "id",
+ "c_bool",
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_decimal");
+ List<String> sinkIndexData5 = readMultiSinkData("st_index5", source5);
+ List<String> sinkIndexData6 = readMultiSinkData("st_index6", source6);
+ String stIndex5 =
+
"{\"c_smallint\":2,\"c_string\":\"NEW\",\"c_float\":4.3,\"c_double\":5.3,\"c_decimal\":6.3,\"id\":1,\"c_int\":3,\"c_bigint\":4,\"c_bool\":true,\"c_tinyint\":1}";
+ String stIndex6 =
+
"{\"c_smallint\":2,\"c_float\":4.3,\"c_double\":5.3,\"c_decimal\":6.3,\"id\":1,\"c_int\":3,\"c_bigint\":4,\"c_bool\":true,\"c_tinyint\":1}";
+ Assertions.assertIterableEquals(Lists.newArrayList(stIndex5),
sinkIndexData5);
+ Assertions.assertIterableEquals(Lists.newArrayList(stIndex6),
sinkIndexData6);
+ }
+
@TestTemplate
public void testElasticsearchWithFullType(TestContainer container)
throws IOException, InterruptedException {
@@ -262,7 +308,7 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
return getDocsWithTransformDate(source, "st_index4");
}
- private List<String> readSinkData() throws InterruptedException {
+ private List<String> readSinkData(String index) throws
InterruptedException {
// wait for index refresh
Thread.sleep(2000);
List<String> source =
@@ -281,7 +327,33 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
"c_int",
"c_date",
"c_timestamp");
- return getDocsWithTransformTimestamp(source, "st_index2");
+ return getDocsWithTransformTimestamp(source, index);
+ }
+
+ private List<String> readMultiSinkData(String index, List<String> source)
+ throws InterruptedException {
+ // wait for index refresh
+ Thread.sleep(2000);
+ Map<String, Object> query = new HashMap<>();
+ query.put("match_all", Maps.newHashMap());
+
+ ScrollResult scrollResult = esRestClient.searchByScroll(index, source,
query, "1m", 1000);
+ scrollResult
+ .getDocs()
+ .forEach(
+ x -> {
+ x.remove("_index");
+ x.remove("_type");
+ x.remove("_id");
+ });
+ List<String> docs =
+ scrollResult.getDocs().stream()
+ .sorted(
+ Comparator.comparingInt(
+ o ->
Integer.valueOf(o.get("c_int").toString())))
+ .map(JsonUtils::toJsonString)
+ .collect(Collectors.toList());
+ return docs;
}
private List<String> getDocsWithTransformTimestamp(List<String> source,
String index) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/fakesource_to_elasticsearch_multi_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/fakesource_to_elasticsearch_multi_sink.conf
new file mode 100644
index 0000000000..cbb28545f2
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/fakesource_to_elasticsearch_multi_sink.conf
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ #checkpoint.interval = 10000
+}
+
+source {
+ FakeSource {
+ tables_configs = [
+ {
+ schema = {
+ table = "st_index5"
+ fields {
+ id = int
+ c_bool = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(16, 1)"
+ c_string = string
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW"]
+ }
+ ]
+ },
+ {
+ schema = {
+ table = "st_index6"
+ fields {
+ id = int
+ c_bool = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(16, 1)"
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3]
+ }
+ ]
+ }
+ ]
+ }
+}
+transform {
+}
+
+sink {
+ Elasticsearch {
+ hosts = ["https://elasticsearch:9200"]
+ username = "elastic"
+ password = "elasticsearch"
+ tls_verify_certificate = false
+ tls_verify_hostname = false
+
+ index = "${table_name}"
+ index_type = "st"
+ "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
+ "data_save_mode"="APPEND_DATA"
+ }
+}