This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 5236a6d  [Improve] remove jupiter5 dependency for testcase (#391)
5236a6d is described below

commit 5236a6d51959295626a7021f320d5926eadcb965
Author: wudi <[email protected]>
AuthorDate: Tue May 28 20:36:53 2024 +0800

    [Improve] remove jupiter5 dependency for testcase (#391)
---
 flink-doris-connector/pom.xml                      |  7 --
 .../java/org/apache/doris/flink/DorisTestBase.java | 25 ++++++
 .../apache/doris/flink/sink/DorisSinkITCase.java   | 74 +++---------------
 .../doris/flink/source/DorisSourceITCase.java      | 15 ++--
 .../doris/flink/tools/cdc/DorisDorisE2ECase.java   |  5 +-
 .../doris/flink/tools/cdc/MySQLDorisE2ECase.java   | 90 +++++-----------------
 6 files changed, 63 insertions(+), 153 deletions(-)

diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 23145e4..db05e2e 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -90,7 +90,6 @@ under the License.
         <slf4j.version>1.7.25</slf4j.version>
         <mockito.version>4.2.0</mockito.version>
         <testcontainers.version>1.17.6</testcontainers.version>
-        <junit-jupiter.version>5.10.1</junit-jupiter.version>
         <junit.version>4.11</junit.version>
         <hamcrest.version>1.3</hamcrest.version>
     </properties>
@@ -330,12 +329,6 @@ under the License.
             <version>${junit.version}</version>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>org.junit.jupiter</groupId>
-            <artifactId>junit-jupiter-api</artifactId>
-            <version>${junit-jupiter.version}</version>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>testcontainers</artifactId>
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
index e0f75f0..8141eaf 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
@@ -18,6 +18,8 @@
 package org.apache.doris.flink;
 
 import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.GenericContainer;
@@ -177,4 +179,27 @@ public abstract class DorisTestBase {
         }
         return list;
     }
+
+    public void checkResult(List<String> expected, String query, int 
columnSize) throws Exception {
+        List<String> actual = new ArrayList<>();
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                String.format(URL, DORIS_CONTAINER.getHost()), 
USERNAME, PASSWORD);
+                Statement statement = connection.createStatement()) {
+            ResultSet sinkResultSet = statement.executeQuery(query);
+            while (sinkResultSet.next()) {
+                List<String> row = new ArrayList<>();
+                for (int i = 1; i <= columnSize; i++) {
+                    Object value = sinkResultSet.getObject(i);
+                    if (value == null) {
+                        row.add("null");
+                    } else {
+                        row.add(value.toString());
+                    }
+                }
+                actual.add(StringUtils.join(row, ","));
+            }
+        }
+        Assert.assertArrayEquals(expected.toArray(), actual.toArray());
+    }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
index c9501d3..ad6923c 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
@@ -28,22 +28,16 @@ import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
 import org.junit.Test;
-import org.junit.jupiter.api.Assertions;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.ResultSet;
 import java.sql.Statement;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 /** DorisSink ITCase with csv and arrow format. */
 public class DorisSinkITCase extends DorisTestBase {
@@ -62,25 +56,9 @@ public class DorisSinkITCase extends DorisTestBase {
         submitJob(TABLE_CSV, properties, new String[] {"doris,1"});
 
         Thread.sleep(10000);
-        Set<List<Object>> actual = new HashSet<>();
-
-        try (Connection connection =
-                        DriverManager.getConnection(
-                                String.format(URL, DORIS_CONTAINER.getHost()), 
USERNAME, PASSWORD);
-                Statement statement = connection.createStatement()) {
-            ResultSet sinkResultSet =
-                    statement.executeQuery(
-                            String.format(
-                                    "select name,age from %s.%s order by 1", 
DATABASE, TABLE_CSV));
-            while (sinkResultSet.next()) {
-                List<Object> row =
-                        Arrays.asList(sinkResultSet.getString("name"), 
sinkResultSet.getInt("age"));
-                actual.add(row);
-            }
-        }
-        Set<List<Object>> expected =
-                Stream.<List<Object>>of(Arrays.asList("doris", 
1)).collect(Collectors.toSet());
-        Assertions.assertIterableEquals(expected, actual);
+        List<String> expected = Arrays.asList("doris,1");
+        String query = String.format("select name,age from %s.%s order by 1", 
DATABASE, TABLE_CSV);
+        checkResult(expected, query, 2);
     }
 
     @Test
@@ -107,25 +85,9 @@ public class DorisSinkITCase extends DorisTestBase {
                 });
 
         Thread.sleep(10000);
-        Set<List<Object>> actual = new HashSet<>();
-        try (Connection connection =
-                        DriverManager.getConnection(
-                                String.format(URL, DORIS_CONTAINER.getHost()), 
USERNAME, PASSWORD);
-                Statement statement = connection.createStatement()) {
-            ResultSet sinkResultSet =
-                    statement.executeQuery(
-                            String.format(
-                                    "select name,age from %s.%s order by 1", 
DATABASE, TABLE_JSON));
-            while (sinkResultSet.next()) {
-                List<Object> row =
-                        Arrays.asList(sinkResultSet.getString("name"), 
sinkResultSet.getInt("age"));
-                actual.add(row);
-            }
-        }
-        Set<List<Object>> expected =
-                Stream.<List<Object>>of(Arrays.asList("doris1", 1), 
Arrays.asList("doris2", 2))
-                        .collect(Collectors.toSet());
-        Assertions.assertIterableEquals(expected, actual);
+        List<String> expected = Arrays.asList("doris1,1", "doris2,2");
+        String query = String.format("select name,age from %s.%s order by 1", 
DATABASE, TABLE_JSON);
+        checkResult(expected, query, 2);
     }
 
     public void submitJob(String table, Properties properties, String[] 
records) throws Exception {
@@ -180,26 +142,10 @@ public class DorisSinkITCase extends DorisTestBase {
         tEnv.executeSql("INSERT INTO doris_sink SELECT 'doris',1 union all 
SELECT 'flink',2");
 
         Thread.sleep(10000);
-        Set<List<Object>> actual = new HashSet<>();
-        try (Connection connection =
-                        DriverManager.getConnection(
-                                String.format(URL, DORIS_CONTAINER.getHost()), 
USERNAME, PASSWORD);
-                Statement statement = connection.createStatement()) {
-            ResultSet sinkResultSet =
-                    statement.executeQuery(
-                            String.format(
-                                    "select name,age from %s.%s order by 1",
-                                    DATABASE, TABLE_JSON_TBL));
-            while (sinkResultSet.next()) {
-                List<Object> row =
-                        Arrays.asList(sinkResultSet.getString("name"), 
sinkResultSet.getInt("age"));
-                actual.add(row);
-            }
-        }
-        Set<List<Object>> expected =
-                Stream.<List<Object>>of(Arrays.asList("doris", 1), 
Arrays.asList("flink", 2))
-                        .collect(Collectors.toSet());
-        Assertions.assertIterableEquals(expected, actual);
+        List<String> expected = Arrays.asList("doris,1", "flink,2");
+        String query =
+                String.format("select name,age from %s.%s order by 1", 
DATABASE, TABLE_JSON_TBL);
+        checkResult(expected, query, 2);
     }
 
     private void initializeTable(String table) throws Exception {
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
index f88b756..bc30c57 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
@@ -29,8 +29,8 @@ import org.apache.doris.flink.DorisTestBase;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
+import org.junit.Assert;
 import org.junit.Test;
-import org.junit.jupiter.api.Assertions;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -65,17 +65,16 @@ public class DorisSourceITCase extends DorisTestBase {
                         .setDorisOptions(dorisBuilder.build())
                         .setDeserializer(new SimpleListDeserializationSchema())
                         .build();
-        List<Object> actual = new ArrayList<>();
+        List<String> actual = new ArrayList<>();
         try (CloseableIterator<List<?>> iterator =
                 env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"Doris Source")
                         .executeAndCollect()) {
             while (iterator.hasNext()) {
-                actual.add(iterator.next());
+                actual.add(iterator.next().toString());
             }
         }
-        List<Object> expected =
-                Arrays.asList(Arrays.asList("doris", 18), 
Arrays.asList("flink", 10));
-        Assertions.assertIterableEquals(expected, actual);
+        List<String> expected = Arrays.asList("[doris, 18]", "[flink, 10]");
+        Assert.assertArrayEquals(actual.toArray(), expected.toArray());
     }
 
     @Test
@@ -102,14 +101,14 @@ public class DorisSourceITCase extends DorisTestBase {
         tEnv.executeSql(sourceDDL);
         TableResult tableResult = tEnv.executeSql("SELECT * FROM 
doris_source");
 
-        List<Object> actual = new ArrayList<>();
+        List<String> actual = new ArrayList<>();
         try (CloseableIterator<Row> iterator = tableResult.collect()) {
             while (iterator.hasNext()) {
                 actual.add(iterator.next().toString());
             }
         }
         String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]"};
-        Assertions.assertIterableEquals(Arrays.asList(expected), actual);
+        Assert.assertArrayEquals(expected, actual.toArray());
     }
 
     private void initializeTable(String table) throws Exception {
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java
index ad40255..8f7998c 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java
@@ -25,14 +25,13 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 
 import org.apache.doris.flink.DorisTestBase;
+import org.junit.Assert;
 import org.junit.Test;
-import org.junit.jupiter.api.Assertions;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.Statement;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 /** DorisDorisE2ECase. */
@@ -90,7 +89,7 @@ public class DorisDorisE2ECase extends DorisTestBase {
             }
         }
         String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]"};
-        Assertions.assertIterableEquals(Arrays.asList(expected), actual);
+        Assert.assertArrayEquals(expected, actual.toArray(new String[0]));
     }
 
     private void initializeDorisTable(String table) throws Exception {
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
index 2aecaaf..3c12061 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
@@ -30,7 +30,6 @@ import 
org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.jupiter.api.Assertions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.MySQLContainer;
@@ -38,21 +37,16 @@ import org.testcontainers.lifecycle.Startables;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.flink.api.common.JobStatus.RUNNING;
@@ -99,12 +93,7 @@ public class MySQLDorisE2ECase extends DorisTestBase {
         JobClient jobClient = submitJob();
         // wait 2 times checkpoint
         Thread.sleep(20000);
-        Set<List<Object>> expected =
-                Stream.<List<Object>>of(
-                                Arrays.asList("doris_1", 1),
-                                Arrays.asList("doris_2", 2),
-                                Arrays.asList("doris_3", 3))
-                        .collect(Collectors.toSet());
+        List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", 
"doris_3,3");
         String sql =
                 "select * from ( select * from %s.%s union all select * from 
%s.%s union all select * from %s.%s ) res order by 1";
         String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE, 
TABLE_2, DATABASE, TABLE_3);
@@ -130,14 +119,9 @@ public class MySQLDorisE2ECase extends DorisTestBase {
         }
 
         Thread.sleep(20000);
-        Set<List<Object>> expected2 =
-                Stream.<List<Object>>of(
-                                Arrays.asList("doris_1", 18),
-                                Arrays.asList("doris_1_1", 10),
-                                Arrays.asList("doris_2_1", 11),
-                                Arrays.asList("doris_3", 3),
-                                Arrays.asList("doris_3_1", 12))
-                        .collect(Collectors.toSet());
+        List<String> expected2 =
+                Arrays.asList(
+                        "doris_1,18", "doris_1_1,10", "doris_2_1,11", 
"doris_3,3", "doris_3_1,12");
         sql =
                 "select * from ( select * from %s.%s union all select * from 
%s.%s union all select * from %s.%s ) res order by 1";
         String query2 = String.format(sql, DATABASE, TABLE_1, DATABASE, 
TABLE_2, DATABASE, TABLE_3);
@@ -160,12 +144,8 @@ public class MySQLDorisE2ECase extends DorisTestBase {
                             DATABASE, TABLE_1));
         }
         Thread.sleep(20000);
-        Set<List<Object>> expected3 =
-                Stream.<List<Object>>of(
-                                Arrays.asList("doris_1", null),
-                                Arrays.asList("doris_1_1", null),
-                                Arrays.asList("doris_1_1_1", "c1_val"))
-                        .collect(Collectors.toSet());
+        List<String> expected3 =
+                Arrays.asList("doris_1,null", "doris_1_1,null", 
"doris_1_1_1,c1_val");
         sql = "select * from %s.%s order by 1";
         String query3 = String.format(sql, DATABASE, TABLE_1);
         checkResult(expected3, query3, 2);
@@ -180,12 +160,7 @@ public class MySQLDorisE2ECase extends DorisTestBase {
         JobClient jobClient = submitJob();
         // wait 2 times checkpoint
         Thread.sleep(20000);
-        Set<List<Object>> expected =
-                Stream.<List<Object>>of(
-                                Arrays.asList("doris_1", 1),
-                                Arrays.asList("doris_2", 2),
-                                Arrays.asList("doris_3", 3))
-                        .collect(Collectors.toSet());
+        List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", 
"doris_3,3");
         String sql =
                 "select * from ( select * from %s.%s union all select * from 
%s.%s union all select * from %s.%s ) res order by 1";
         String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE, 
TABLE_2, DATABASE, TABLE_3);
@@ -194,10 +169,7 @@ public class MySQLDorisE2ECase extends DorisTestBase {
         // auto create table4
         addTableTable_4();
         Thread.sleep(20000);
-        Set<List<Object>> expected2 =
-                Stream.<List<Object>>of(
-                                Arrays.asList("doris_4_1", 4), 
Arrays.asList("doris_4_2", 4))
-                        .collect(Collectors.toSet());
+        List<String> expected2 = Arrays.asList("doris_4_1,4", "doris_4_2,4");
         sql = "select * from %s.%s order by 1";
         String query2 = String.format(sql, DATABASE, TABLE_4);
         checkResult(expected2, query2, 2);
@@ -229,16 +201,15 @@ public class MySQLDorisE2ECase extends DorisTestBase {
         }
 
         Thread.sleep(20000);
-        Set<List<Object>> expected3 =
-                Stream.<List<Object>>of(
-                                Arrays.asList("doris_1", 18),
-                                Arrays.asList("doris_1_1", 10),
-                                Arrays.asList("doris_2_1", 11),
-                                Arrays.asList("doris_3", 3),
-                                Arrays.asList("doris_3_1", 12),
-                                Arrays.asList("doris_4_1", 41),
-                                Arrays.asList("doris_4_3", 43))
-                        .collect(Collectors.toSet());
+        List<String> expected3 =
+                Arrays.asList(
+                        "doris_1,18",
+                        "doris_1_1,10",
+                        "doris_2_1,11",
+                        "doris_3,3",
+                        "doris_3_1,12",
+                        "doris_4_1,41",
+                        "doris_4_3,43");
         sql =
                 "select * from ( select * from %s.%s union all select * from 
%s.%s union all select * from %s.%s union all select * from %s.%s ) res order 
by 1";
         String query3 =
@@ -263,12 +234,8 @@ public class MySQLDorisE2ECase extends DorisTestBase {
                             "insert into %s.%s  values 
('doris_4_4','c1_val')", DATABASE, TABLE_4));
         }
         Thread.sleep(20000);
-        Set<List<Object>> expected4 =
-                Stream.<List<Object>>of(
-                                Arrays.asList("doris_4_1", null),
-                                Arrays.asList("doris_4_3", null),
-                                Arrays.asList("doris_4_4", "c1_val"))
-                        .collect(Collectors.toSet());
+        List<String> expected4 =
+                Arrays.asList("doris_4_1,null", "doris_4_3,null", 
"doris_4_4,c1_val");
         sql = "select * from %s.%s order by 1";
         String query4 = String.format(sql, DATABASE, TABLE_4);
         checkResult(expected4, query4, 2);
@@ -288,25 +255,6 @@ public class MySQLDorisE2ECase extends DorisTestBase {
         }
     }
 
-    public void checkResult(Set<List<Object>> expected, String query, int 
columnSize)
-            throws Exception {
-        Set<List<Object>> actual = new HashSet<>();
-        try (Connection connection =
-                        DriverManager.getConnection(
-                                String.format(URL, DORIS_CONTAINER.getHost()), 
USERNAME, PASSWORD);
-                Statement statement = connection.createStatement()) {
-            ResultSet sinkResultSet = statement.executeQuery(query);
-            while (sinkResultSet.next()) {
-                List<Object> row = new ArrayList<>();
-                for (int i = 1; i <= columnSize; i++) {
-                    row.add(sinkResultSet.getObject(i));
-                }
-                actual.add(row);
-            }
-        }
-        Assertions.assertIterableEquals(expected, actual);
-    }
-
     public JobClient submitJob() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRestartStrategy(RestartStrategies.noRestart());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to