This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 5236a6d [Improve] remove jupiter5 dependency for testcase (#391)
5236a6d is described below
commit 5236a6d51959295626a7021f320d5926eadcb965
Author: wudi <[email protected]>
AuthorDate: Tue May 28 20:36:53 2024 +0800
[Improve] remove jupiter5 dependency for testcase (#391)
---
flink-doris-connector/pom.xml | 7 --
.../java/org/apache/doris/flink/DorisTestBase.java | 25 ++++++
.../apache/doris/flink/sink/DorisSinkITCase.java | 74 +++---------------
.../doris/flink/source/DorisSourceITCase.java | 15 ++--
.../doris/flink/tools/cdc/DorisDorisE2ECase.java | 5 +-
.../doris/flink/tools/cdc/MySQLDorisE2ECase.java | 90 +++++-----------------
6 files changed, 63 insertions(+), 153 deletions(-)
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 23145e4..db05e2e 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -90,7 +90,6 @@ under the License.
<slf4j.version>1.7.25</slf4j.version>
<mockito.version>4.2.0</mockito.version>
<testcontainers.version>1.17.6</testcontainers.version>
- <junit-jupiter.version>5.10.1</junit-jupiter.version>
<junit.version>4.11</junit.version>
<hamcrest.version>1.3</hamcrest.version>
</properties>
@@ -330,12 +329,6 @@ under the License.
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-api</artifactId>
- <version>${junit-jupiter.version}</version>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
index e0f75f0..8141eaf 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
@@ -18,6 +18,8 @@
package org.apache.doris.flink;
import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
@@ -177,4 +179,27 @@ public abstract class DorisTestBase {
}
return list;
}
+
+ public void checkResult(List<String> expected, String query, int
columnSize) throws Exception {
+ List<String> actual = new ArrayList<>();
+ try (Connection connection =
+ DriverManager.getConnection(
+ String.format(URL, DORIS_CONTAINER.getHost()),
USERNAME, PASSWORD);
+ Statement statement = connection.createStatement()) {
+ ResultSet sinkResultSet = statement.executeQuery(query);
+ while (sinkResultSet.next()) {
+ List<String> row = new ArrayList<>();
+ for (int i = 1; i <= columnSize; i++) {
+ Object value = sinkResultSet.getObject(i);
+ if (value == null) {
+ row.add("null");
+ } else {
+ row.add(value.toString());
+ }
+ }
+ actual.add(StringUtils.join(row, ","));
+ }
+ }
+ Assert.assertArrayEquals(expected.toArray(), actual.toArray());
+ }
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
index c9501d3..ad6923c 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
@@ -28,22 +28,16 @@ import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
import org.junit.Test;
-import org.junit.jupiter.api.Assertions;
import java.sql.Connection;
import java.sql.DriverManager;
-import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Set;
import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
/** DorisSink ITCase with csv and arrow format. */
public class DorisSinkITCase extends DorisTestBase {
@@ -62,25 +56,9 @@ public class DorisSinkITCase extends DorisTestBase {
submitJob(TABLE_CSV, properties, new String[] {"doris,1"});
Thread.sleep(10000);
- Set<List<Object>> actual = new HashSet<>();
-
- try (Connection connection =
- DriverManager.getConnection(
- String.format(URL, DORIS_CONTAINER.getHost()),
USERNAME, PASSWORD);
- Statement statement = connection.createStatement()) {
- ResultSet sinkResultSet =
- statement.executeQuery(
- String.format(
- "select name,age from %s.%s order by 1",
DATABASE, TABLE_CSV));
- while (sinkResultSet.next()) {
- List<Object> row =
- Arrays.asList(sinkResultSet.getString("name"),
sinkResultSet.getInt("age"));
- actual.add(row);
- }
- }
- Set<List<Object>> expected =
- Stream.<List<Object>>of(Arrays.asList("doris",
1)).collect(Collectors.toSet());
- Assertions.assertIterableEquals(expected, actual);
+ List<String> expected = Arrays.asList("doris,1");
+ String query = String.format("select name,age from %s.%s order by 1",
DATABASE, TABLE_CSV);
+ checkResult(expected, query, 2);
}
@Test
@@ -107,25 +85,9 @@ public class DorisSinkITCase extends DorisTestBase {
});
Thread.sleep(10000);
- Set<List<Object>> actual = new HashSet<>();
- try (Connection connection =
- DriverManager.getConnection(
- String.format(URL, DORIS_CONTAINER.getHost()),
USERNAME, PASSWORD);
- Statement statement = connection.createStatement()) {
- ResultSet sinkResultSet =
- statement.executeQuery(
- String.format(
- "select name,age from %s.%s order by 1",
DATABASE, TABLE_JSON));
- while (sinkResultSet.next()) {
- List<Object> row =
- Arrays.asList(sinkResultSet.getString("name"),
sinkResultSet.getInt("age"));
- actual.add(row);
- }
- }
- Set<List<Object>> expected =
- Stream.<List<Object>>of(Arrays.asList("doris1", 1),
Arrays.asList("doris2", 2))
- .collect(Collectors.toSet());
- Assertions.assertIterableEquals(expected, actual);
+ List<String> expected = Arrays.asList("doris1,1", "doris2,2");
+ String query = String.format("select name,age from %s.%s order by 1",
DATABASE, TABLE_JSON);
+ checkResult(expected, query, 2);
}
public void submitJob(String table, Properties properties, String[]
records) throws Exception {
@@ -180,26 +142,10 @@ public class DorisSinkITCase extends DorisTestBase {
tEnv.executeSql("INSERT INTO doris_sink SELECT 'doris',1 union all
SELECT 'flink',2");
Thread.sleep(10000);
- Set<List<Object>> actual = new HashSet<>();
- try (Connection connection =
- DriverManager.getConnection(
- String.format(URL, DORIS_CONTAINER.getHost()),
USERNAME, PASSWORD);
- Statement statement = connection.createStatement()) {
- ResultSet sinkResultSet =
- statement.executeQuery(
- String.format(
- "select name,age from %s.%s order by 1",
- DATABASE, TABLE_JSON_TBL));
- while (sinkResultSet.next()) {
- List<Object> row =
- Arrays.asList(sinkResultSet.getString("name"),
sinkResultSet.getInt("age"));
- actual.add(row);
- }
- }
- Set<List<Object>> expected =
- Stream.<List<Object>>of(Arrays.asList("doris", 1),
Arrays.asList("flink", 2))
- .collect(Collectors.toSet());
- Assertions.assertIterableEquals(expected, actual);
+ List<String> expected = Arrays.asList("doris,1", "flink,2");
+ String query =
+ String.format("select name,age from %s.%s order by 1",
DATABASE, TABLE_JSON_TBL);
+ checkResult(expected, query, 2);
}
private void initializeTable(String table) throws Exception {
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
index f88b756..bc30c57 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
@@ -29,8 +29,8 @@ import org.apache.doris.flink.DorisTestBase;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
+import org.junit.Assert;
import org.junit.Test;
-import org.junit.jupiter.api.Assertions;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -65,17 +65,16 @@ public class DorisSourceITCase extends DorisTestBase {
.setDorisOptions(dorisBuilder.build())
.setDeserializer(new SimpleListDeserializationSchema())
.build();
- List<Object> actual = new ArrayList<>();
+ List<String> actual = new ArrayList<>();
try (CloseableIterator<List<?>> iterator =
env.fromSource(source, WatermarkStrategy.noWatermarks(),
"Doris Source")
.executeAndCollect()) {
while (iterator.hasNext()) {
- actual.add(iterator.next());
+ actual.add(iterator.next().toString());
}
}
- List<Object> expected =
- Arrays.asList(Arrays.asList("doris", 18),
Arrays.asList("flink", 10));
- Assertions.assertIterableEquals(expected, actual);
+ List<String> expected = Arrays.asList("[doris, 18]", "[flink, 10]");
+ Assert.assertArrayEquals(actual.toArray(), expected.toArray());
}
@Test
@@ -102,14 +101,14 @@ public class DorisSourceITCase extends DorisTestBase {
tEnv.executeSql(sourceDDL);
TableResult tableResult = tEnv.executeSql("SELECT * FROM
doris_source");
- List<Object> actual = new ArrayList<>();
+ List<String> actual = new ArrayList<>();
try (CloseableIterator<Row> iterator = tableResult.collect()) {
while (iterator.hasNext()) {
actual.add(iterator.next().toString());
}
}
String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]"};
- Assertions.assertIterableEquals(Arrays.asList(expected), actual);
+ Assert.assertArrayEquals(expected, actual.toArray());
}
private void initializeTable(String table) throws Exception {
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java
index ad40255..8f7998c 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java
@@ -25,14 +25,13 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.doris.flink.DorisTestBase;
+import org.junit.Assert;
import org.junit.Test;
-import org.junit.jupiter.api.Assertions;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
/** DorisDorisE2ECase. */
@@ -90,7 +89,7 @@ public class DorisDorisE2ECase extends DorisTestBase {
}
}
String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]"};
- Assertions.assertIterableEquals(Arrays.asList(expected), actual);
+ Assert.assertArrayEquals(expected, actual.toArray(new String[0]));
}
private void initializeDorisTable(String table) throws Exception {
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
index 2aecaaf..3c12061 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
@@ -30,7 +30,6 @@ import
org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MySQLContainer;
@@ -38,21 +37,16 @@ import org.testcontainers.lifecycle.Startables;
import java.sql.Connection;
import java.sql.DriverManager;
-import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.api.common.JobStatus.RUNNING;
@@ -99,12 +93,7 @@ public class MySQLDorisE2ECase extends DorisTestBase {
JobClient jobClient = submitJob();
// wait 2 times checkpoint
Thread.sleep(20000);
- Set<List<Object>> expected =
- Stream.<List<Object>>of(
- Arrays.asList("doris_1", 1),
- Arrays.asList("doris_2", 2),
- Arrays.asList("doris_3", 3))
- .collect(Collectors.toSet());
+ List<String> expected = Arrays.asList("doris_1,1", "doris_2,2",
"doris_3,3");
String sql =
"select * from ( select * from %s.%s union all select * from
%s.%s union all select * from %s.%s ) res order by 1";
String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE,
TABLE_2, DATABASE, TABLE_3);
@@ -130,14 +119,9 @@ public class MySQLDorisE2ECase extends DorisTestBase {
}
Thread.sleep(20000);
- Set<List<Object>> expected2 =
- Stream.<List<Object>>of(
- Arrays.asList("doris_1", 18),
- Arrays.asList("doris_1_1", 10),
- Arrays.asList("doris_2_1", 11),
- Arrays.asList("doris_3", 3),
- Arrays.asList("doris_3_1", 12))
- .collect(Collectors.toSet());
+ List<String> expected2 =
+ Arrays.asList(
+ "doris_1,18", "doris_1_1,10", "doris_2_1,11",
"doris_3,3", "doris_3_1,12");
sql =
"select * from ( select * from %s.%s union all select * from
%s.%s union all select * from %s.%s ) res order by 1";
String query2 = String.format(sql, DATABASE, TABLE_1, DATABASE,
TABLE_2, DATABASE, TABLE_3);
@@ -160,12 +144,8 @@ public class MySQLDorisE2ECase extends DorisTestBase {
DATABASE, TABLE_1));
}
Thread.sleep(20000);
- Set<List<Object>> expected3 =
- Stream.<List<Object>>of(
- Arrays.asList("doris_1", null),
- Arrays.asList("doris_1_1", null),
- Arrays.asList("doris_1_1_1", "c1_val"))
- .collect(Collectors.toSet());
+ List<String> expected3 =
+ Arrays.asList("doris_1,null", "doris_1_1,null",
"doris_1_1_1,c1_val");
sql = "select * from %s.%s order by 1";
String query3 = String.format(sql, DATABASE, TABLE_1);
checkResult(expected3, query3, 2);
@@ -180,12 +160,7 @@ public class MySQLDorisE2ECase extends DorisTestBase {
JobClient jobClient = submitJob();
// wait 2 times checkpoint
Thread.sleep(20000);
- Set<List<Object>> expected =
- Stream.<List<Object>>of(
- Arrays.asList("doris_1", 1),
- Arrays.asList("doris_2", 2),
- Arrays.asList("doris_3", 3))
- .collect(Collectors.toSet());
+ List<String> expected = Arrays.asList("doris_1,1", "doris_2,2",
"doris_3,3");
String sql =
"select * from ( select * from %s.%s union all select * from
%s.%s union all select * from %s.%s ) res order by 1";
String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE,
TABLE_2, DATABASE, TABLE_3);
@@ -194,10 +169,7 @@ public class MySQLDorisE2ECase extends DorisTestBase {
// auto create table4
addTableTable_4();
Thread.sleep(20000);
- Set<List<Object>> expected2 =
- Stream.<List<Object>>of(
- Arrays.asList("doris_4_1", 4),
Arrays.asList("doris_4_2", 4))
- .collect(Collectors.toSet());
+ List<String> expected2 = Arrays.asList("doris_4_1,4", "doris_4_2,4");
sql = "select * from %s.%s order by 1";
String query2 = String.format(sql, DATABASE, TABLE_4);
checkResult(expected2, query2, 2);
@@ -229,16 +201,15 @@ public class MySQLDorisE2ECase extends DorisTestBase {
}
Thread.sleep(20000);
- Set<List<Object>> expected3 =
- Stream.<List<Object>>of(
- Arrays.asList("doris_1", 18),
- Arrays.asList("doris_1_1", 10),
- Arrays.asList("doris_2_1", 11),
- Arrays.asList("doris_3", 3),
- Arrays.asList("doris_3_1", 12),
- Arrays.asList("doris_4_1", 41),
- Arrays.asList("doris_4_3", 43))
- .collect(Collectors.toSet());
+ List<String> expected3 =
+ Arrays.asList(
+ "doris_1,18",
+ "doris_1_1,10",
+ "doris_2_1,11",
+ "doris_3,3",
+ "doris_3_1,12",
+ "doris_4_1,41",
+ "doris_4_3,43");
sql =
"select * from ( select * from %s.%s union all select * from
%s.%s union all select * from %s.%s union all select * from %s.%s ) res order
by 1";
String query3 =
@@ -263,12 +234,8 @@ public class MySQLDorisE2ECase extends DorisTestBase {
"insert into %s.%s values
('doris_4_4','c1_val')", DATABASE, TABLE_4));
}
Thread.sleep(20000);
- Set<List<Object>> expected4 =
- Stream.<List<Object>>of(
- Arrays.asList("doris_4_1", null),
- Arrays.asList("doris_4_3", null),
- Arrays.asList("doris_4_4", "c1_val"))
- .collect(Collectors.toSet());
+ List<String> expected4 =
+ Arrays.asList("doris_4_1,null", "doris_4_3,null",
"doris_4_4,c1_val");
sql = "select * from %s.%s order by 1";
String query4 = String.format(sql, DATABASE, TABLE_4);
checkResult(expected4, query4, 2);
@@ -288,25 +255,6 @@ public class MySQLDorisE2ECase extends DorisTestBase {
}
}
- public void checkResult(Set<List<Object>> expected, String query, int
columnSize)
- throws Exception {
- Set<List<Object>> actual = new HashSet<>();
- try (Connection connection =
- DriverManager.getConnection(
- String.format(URL, DORIS_CONTAINER.getHost()),
USERNAME, PASSWORD);
- Statement statement = connection.createStatement()) {
- ResultSet sinkResultSet = statement.executeQuery(query);
- while (sinkResultSet.next()) {
- List<Object> row = new ArrayList<>();
- for (int i = 1; i <= columnSize; i++) {
- row.add(sinkResultSet.getObject(i));
- }
- actual.add(row);
- }
- }
- Assertions.assertIterableEquals(expected, actual);
- }
-
public JobClient submitJob() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]