This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 35c020fcb [Improve][Connector-V2][ElasticSearch] Refactor
ElasticSearch connector e2e test cases (#3613)
35c020fcb is described below
commit 35c020fcb24b05291e1369772c65d96581eabc90
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Mon Dec 5 23:57:46 2022 +0800
[Improve][Connector-V2][ElasticSearch] Refactor ElasticSearch connector e2e
test cases (#3613)
* [Improve][Connector-V2][ElasticSearch] Refactor ElasticSearch connector
e2e test cases
---
.../connector-elasticsearch-e2e}/pom.xml | 13 +-
.../connector}/elasticsearch/ElasticsearchIT.java | 34 ++--
.../elasticsearch_source_and_sink.conf | 0
seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 +
.../seatunnel-flink-connector-v2-e2e/pom.xml | 1 -
.../connector-elasticsearch-spark-e2e/pom.xml | 61 --------
.../spark/v2/elasticsearch/ElasticsearchIT.java | 171 ---------------------
.../elasticsearch_source_and_sink.conf | 67 --------
.../seatunnel-spark-connector-v2-e2e/pom.xml | 1 -
9 files changed, 22 insertions(+), 327 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/pom.xml
similarity index 81%
rename from
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/pom.xml
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/pom.xml
index e1c1e4841..b0e13d290 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/pom.xml
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/pom.xml
@@ -18,23 +18,14 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-flink-connector-v2-e2e</artifactId>
+ <artifactId>seatunnel-connector-v2-e2e</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>connector-elasticsearch-flink-e2e</artifactId>
+ <artifactId>connector-elasticsearch-e2e</artifactId>
<dependencies>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-flink-e2e-base</artifactId>
- <version>${project.version}</version>
- <classifier>tests</classifier>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
<!-- SeaTunnel connectors -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/elasticsearch/ElasticsearchIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
similarity index 85%
rename from
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/elasticsearch/ElasticsearchIT.java
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index eded55f56..1427eaaea 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/elasticsearch/ElasticsearchIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -15,12 +15,14 @@
* limitations under the License.
*/
-package org.apache.seatunnel.e2e.flink.v2.elasticsearch;
+package org.apache.seatunnel.e2e.connector.elasticsearch;
import org.apache.seatunnel.common.utils.JsonUtils;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
-import org.apache.seatunnel.e2e.flink.FlinkContainer;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -29,7 +31,7 @@ import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
@@ -50,7 +52,7 @@ import java.util.Map;
import java.util.stream.Collectors;
@Slf4j
-public class ElasticsearchIT extends FlinkContainer {
+public class ElasticsearchIT extends TestSuiteBase implements TestResource {
private List<String> testDataset;
@@ -59,11 +61,12 @@ public class ElasticsearchIT extends FlinkContainer {
private EsRestClient esRestClient;
@BeforeEach
- public void startMongoContainer() throws Exception {
+ @Override
+ public void startUp() throws Exception {
container = new
ElasticsearchContainer(DockerImageName.parse("elasticsearch:6.8.23").asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch"))
- .withNetwork(NETWORK)
- .withNetworkAliases("elasticsearch")
- .withLogConsumer(new
Slf4jLogConsumer(DockerLoggerFactory.getLogger("elasticsearch:6.8.23")));
+ .withNetwork(NETWORK)
+ .withNetworkAliases("elasticsearch")
+ .withLogConsumer(new
Slf4jLogConsumer(DockerLoggerFactory.getLogger("elasticsearch:6.8.23")));
container.start();
log.info("Elasticsearch container started");
esRestClient =
EsRestClient.createInstance(Lists.newArrayList(container.getHttpHostAddress()),
"", "");
@@ -91,14 +94,14 @@ public class ElasticsearchIT extends FlinkContainer {
esRestClient.bulk(requestBody.toString());
}
- @Test
- public void testElasticsearch() throws IOException, InterruptedException {
- Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/elasticsearch/elasticsearch_source_and_sink.conf");
+ @TestTemplate
+ public void testElasticsearch(TestContainer container) throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/elasticsearch/elasticsearch_source_and_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode());
List<String> sinData = readSinkData();
Assertions.assertIterableEquals(
- testDataset,
- sinData
+ testDataset,
+ sinData
);
}
@@ -140,7 +143,7 @@ public class ElasticsearchIT extends FlinkContainer {
LocalDate.now().toString(),
LocalDateTime.now().toString()
};
- for (int j = 0; j < fields.length; j++){
+ for (int j = 0; j < fields.length; j++) {
doc.put(fields[j], values[j]);
}
documents.add(objectMapper.writeValueAsString(doc));
@@ -163,7 +166,8 @@ public class ElasticsearchIT extends FlinkContainer {
}
@AfterEach
- public void close() {
+ @Override
+ public void tearDown() {
esRestClient.close();
container.close();
}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
similarity index 100%
rename from
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index d31cc7534..bb50ce56d 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -37,6 +37,7 @@
<module>connector-http-e2e</module>
<module>connector-rabbitmq-e2e</module>
<module>connector-kafka-e2e</module>
+ <module>connector-elasticsearch-e2e</module>
</modules>
<artifactId>seatunnel-connector-v2-e2e</artifactId>
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index 776b82fe9..0b5f2bb0e 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -29,7 +29,6 @@
<modules>
<module>connector-flink-e2e-base</module>
<module>connector-jdbc-flink-e2e</module>
- <module>connector-elasticsearch-flink-e2e</module>
<module>connector-iotdb-flink-e2e</module>
<module>connector-datahub-flink-e2e</module>
<module>connector-fake-flink-e2e</module>
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/pom.xml
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/pom.xml
deleted file mode 100644
index 9f7a1a72a..000000000
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/pom.xml
+++ /dev/null
@@ -1,61 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-spark-connector-v2-e2e</artifactId>
- <version>${revision}</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>connector-elasticsearch-spark-e2e</artifactId>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-spark-e2e-base</artifactId>
- <version>${project.version}</version>
- <classifier>tests</classifier>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <!-- SeaTunnel connectors -->
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-fake</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-elasticsearch</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
-
- <dependency>
- <groupId>org.testcontainers</groupId>
- <artifactId>elasticsearch</artifactId>
- <version>1.17.3</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
-</project>
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/elasticsearch/ElasticsearchIT.java
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/elasticsearch/ElasticsearchIT.java
deleted file mode 100644
index 81ac664eb..000000000
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/elasticsearch/ElasticsearchIT.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.spark.v2.elasticsearch;
-
-import org.apache.seatunnel.common.utils.JsonUtils;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
-import org.apache.seatunnel.e2e.spark.SparkContainer;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.elasticsearch.ElasticsearchContainer;
-import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.DockerLoggerFactory;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.net.UnknownHostException;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-@Slf4j
-public class ElasticsearchIT extends SparkContainer {
-
- private List<String> testDataset;
-
- private ElasticsearchContainer container;
-
- private EsRestClient esRestClient;
-
- @BeforeEach
- public void startElasticsearchContainer() throws Exception {
- container = new
ElasticsearchContainer(DockerImageName.parse("elasticsearch:6.8.23").asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch"))
- .withNetwork(NETWORK)
- .withNetworkAliases("elasticsearch")
- .withLogConsumer(new
Slf4jLogConsumer(DockerLoggerFactory.getLogger("elasticsearch:6.8.23")));
- container.start();
- log.info("Elasticsearch container started");
- esRestClient =
EsRestClient.createInstance(Lists.newArrayList(container.getHttpHostAddress()),
"", "");
- testDataset = generateTestDataSet();
- createIndexDocs();
-
- }
-
- /**
- * create a index,and bulk some documents
- */
- private void createIndexDocs() {
- StringBuilder requestBody = new StringBuilder();
- Map<String, String> indexInner = new HashMap<>();
- indexInner.put("_index", "st");
-
- Map<String, Map<String, String>> indexParam = new HashMap<>();
- indexParam.put("index", indexInner);
- String indexHeader =
"{\"index\":{\"_index\":\"st_index\",\"_type\":\"st\"}\n";
- for (int i = 0; i < testDataset.size(); i++) {
- String row = testDataset.get(i);
- requestBody.append(indexHeader);
- requestBody.append(row);
- requestBody.append("\n");
- }
- esRestClient.bulk(requestBody.toString());
- }
-
- @Test
- public void testElasticsearch() throws IOException, InterruptedException {
- Container.ExecResult execResult =
executeSeaTunnelSparkJob("/elasticsearch/elasticsearch_source_and_sink.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- List<String> sinData = readSinkData();
- Assertions.assertIterableEquals(
- testDataset,
- sinData
- );
- }
-
- private List<String> generateTestDataSet() throws JsonProcessingException,
UnknownHostException {
- String[] fields = new String[]{
- "c_map",
- "c_array",
- "c_string",
- "c_boolean",
- "c_tinyint",
- "c_smallint",
- "c_int",
- "c_bigint",
- "c_float",
- "c_double",
- "c_decimal",
- "c_bytes",
- "c_date",
- "c_timestamp"
- };
-
- List<String> documents = new ArrayList<>();
- ObjectMapper objectMapper = new ObjectMapper();
- for (int i = 0; i < 100; i++) {
- Map<String, Object> doc = new HashMap<>();
- Object[] values = new Object[]{
- Collections.singletonMap("key",
Short.parseShort(String.valueOf(i))),
- new Byte[]{Byte.parseByte("1")},
- "string",
- Boolean.FALSE,
- Byte.parseByte("1"),
- Short.parseShort("1"),
- i,
- Long.parseLong("1"),
- Float.parseFloat("1.1"),
- Double.parseDouble("1.1"),
- BigDecimal.valueOf(11, 1),
- "test".getBytes(),
- LocalDate.now().toString(),
- LocalDateTime.now().toString()
- };
- for (int j = 0; j < fields.length; j++){
- doc.put(fields[j], values[j]);
- }
- documents.add(objectMapper.writeValueAsString(doc));
- }
- return documents;
- }
-
- private List<String> readSinkData() throws InterruptedException{
- //wait for index refresh
- Thread.sleep(2000);
- List<String> source = Lists.newArrayList("c_map", "c_array",
"c_string", "c_boolean", "c_tinyint", "c_smallint", "c_int", "c_bigint",
"c_float", "c_double", "c_decimal", "c_bytes", "c_date", "c_timestamp");
- ScrollResult scrollResult = esRestClient.searchByScroll("st_index2",
source, "1m", 1000);
- scrollResult.getDocs().forEach(x -> {
- x.remove("_index");
- x.remove("_type");
- x.remove("_id");
- });
- List<String> docs =
scrollResult.getDocs().stream().sorted(Comparator.comparingInt(o ->
Integer.valueOf(o.get("c_int").toString()))).map(JsonUtils::toJsonString).collect(Collectors.toList());
- return docs;
- }
-
- @AfterEach
- public void close() {
- esRestClient.close();
- container.close();
- }
-}
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
deleted file mode 100644
index 9d4f7d8a8..000000000
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
+++ /dev/null
@@ -1,67 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-######
-###### This config file is a demonstration of streaming processing in
seatunnel config
-######
-
-env {
- # You can set spark configuration here
- # see available properties defined by spark:
https://spark.apache.org/docs/latest/configuration.html#available-properties
- job.mode = "BATCH"
- spark.app.name = "SeaTunnel"
- spark.executor.instances = 2
- spark.executor.cores = 1
- spark.executor.memory = "1g"
- spark.master = local
-}
-
-source {
- Elasticsearch {
- hosts = ["elasticsearch:9200"]
- index = "st_index"
- schema = {
- fields {
- c_map = "map<string, tinyint>"
- c_array = "array<tinyint>"
- c_string = string
- c_boolean = boolean
- c_tinyint = tinyint
- c_smallint = smallint
- c_int = int
- c_bigint = bigint
- c_float = float
- c_double = double
- c_decimal = "decimal(2, 1)"
- c_bytes = bytes
- c_date = date
- c_timestamp = timestamp
- }
- }
- }
-}
-
-transform {
-}
-
-sink {
- Elasticsearch{
- hosts = ["elasticsearch:9200"]
- index = "st_index2"
- index_type = "st"
- }
-}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
index bbdd1f6a4..c7c7f8311 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
@@ -29,7 +29,6 @@
<modules>
<module>connector-spark-e2e-base</module>
<module>connector-datahub-spark-e2e</module>
- <module>connector-elasticsearch-spark-e2e</module>
<module>connector-fake-spark-e2e</module>
<module>connector-iotdb-spark-e2e</module>
<module>connector-jdbc-spark-e2e</module>