This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 46d4567b [Improve](test) improve some ut and itcase (#486)
46d4567b is described below
commit 46d4567baeeb525acbb5590e26727b646880ab20
Author: wudi <[email protected]>
AuthorDate: Wed Sep 11 10:37:28 2024 +0800
[Improve](test) improve some ut and itcase (#486)
---
flink-doris-connector/pom.xml | 6 +
.../doris/flink/catalog/DorisCatalogITCase.java | 2 +-
.../flink/container/AbstractContainerTestBase.java | 23 ++
.../doris/flink/container/AbstractE2EService.java | 1 +
.../flink/container/AbstractITCaseService.java | 21 +-
.../flink/container/e2e/Doris2DorisE2ECase.java | 2 +-
.../apache/doris/flink/sink/DorisSinkITCase.java | 35 +--
.../doris/flink/source/DorisSourceITCase.java | 240 ++++++++++++++++++---
.../enumerator/DorisSourceEnumeratorTest.java | 111 ++++++++++
9 files changed, 388 insertions(+), 53 deletions(-)
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 64ca7392..d773339b 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -423,6 +423,12 @@ under the License.
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>com.github.jsqlparser</groupId>
<artifactId>jsqlparser</artifactId>
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java
index b3a3ce04..099f6ebd 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java
@@ -146,7 +146,7 @@ public class DorisCatalogITCase extends
AbstractITCaseService {
props.put("sink.enable-2pc", "false");
catalog = new DorisCatalog(TEST_CATALOG_NAME, connectionOptions,
TEST_DB, props);
this.tEnv =
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
- tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+ tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
DEFAULT_PARALLELISM);
// Use doris catalog.
tEnv.registerCatalog(TEST_CATALOG_NAME, catalog);
tEnv.useCatalog(TEST_CATALOG_NAME);
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
index 967e6f36..61e0faac 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
@@ -24,11 +24,18 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
+import java.util.List;
import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public abstract class AbstractContainerTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractContainerTestBase.class);
private static ContainerService dorisContainerService;
+ public static final int DEFAULT_PARALLELISM = 2;
@BeforeClass
public static void initContainers() {
@@ -88,4 +95,20 @@ public abstract class AbstractContainerTestBase {
dorisContainerService.close();
LOG.info("Doris container was closed.");
}
+
+ // ------------------------------------------------------------------------
+ // test utilities
+ // ------------------------------------------------------------------------
+ public static void assertEqualsInAnyOrder(List<Object> expected,
List<Object> actual) {
+ assertTrue(expected != null && actual != null);
+ assertEqualsInOrder(
+ expected.stream().sorted().collect(Collectors.toList()),
+ actual.stream().sorted().collect(Collectors.toList()));
+ }
+
+ public static void assertEqualsInOrder(List<Object> expected, List<Object>
actual) {
+ assertTrue(expected != null && actual != null);
+ assertEquals(expected.size(), actual.size());
+ assertArrayEquals(expected.toArray(new Object[0]), actual.toArray(new
Object[0]));
+ }
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java
index 527f82cc..ec536ee6 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java
@@ -113,6 +113,7 @@ public abstract class AbstractE2EService extends
AbstractContainerTestBase {
private StreamExecutionEnvironment configFlinkEnvironment() {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
Map<String, String> flinkMap = new HashMap<>();
flinkMap.put("execution.checkpointing.interval", "10s");
flinkMap.put("pipeline.operator-chaining", "false");
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
index 956b8be6..6628933c 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
@@ -23,12 +23,8 @@ import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.core.execution.JobClient;
import
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.minicluster.RpcServiceSharing;
-import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.function.SupplierWithException;
-import org.junit.Rule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,16 +81,6 @@ public abstract class AbstractITCaseService extends
AbstractContainerTestBase {
}
}
- @Rule
- public final MiniClusterWithClientResource miniClusterResource =
- new MiniClusterWithClientResource(
- new MiniClusterResourceConfiguration.Builder()
- .setNumberTaskManagers(1)
- .setNumberSlotsPerTaskManager(2)
- .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
- .withHaLeadershipControl()
- .build());
-
/** The type of failover. */
protected enum FailoverType {
TM,
@@ -138,4 +124,11 @@ public abstract class AbstractITCaseService extends
AbstractContainerTestBase {
LOG.info("flink cluster will grant job master leadership. jobId={}",
jobId);
haLeadershipControl.grantJobMasterLeadership(jobId).get();
}
+
+ protected void sleepMs(long millis) {
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException ignored) {
+ }
+ }
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
index fcb4858a..4b4e3b26 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
@@ -55,7 +55,7 @@ public class Doris2DorisE2ECase extends AbstractE2EService {
LOG.info("Start executing the test case of doris to doris.");
initializeDorisTable();
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
+ env.setParallelism(2);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
index 50bcf6be..96562fa4 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
@@ -22,8 +22,11 @@ import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
@@ -35,6 +38,7 @@ import org.apache.doris.flink.sink.DorisSink.Builder;
import org.apache.doris.flink.sink.batch.DorisBatchSink;
import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
import org.apache.doris.flink.utils.MockSource;
+import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,6 +69,16 @@ public class DorisSinkITCase extends AbstractITCaseService {
static final String TABLE_CSV_JM = "tbl_csv_jm";
static final String TABLE_CSV_TM = "tbl_csv_tm";
+ @Rule
+ public final MiniClusterWithClientResource miniClusterResource =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(2)
+ .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+ .withHaLeadershipControl()
+ .build());
+
@Test
public void testSinkCsvFormat() throws Exception {
initializeTable(TABLE_CSV);
@@ -131,6 +145,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ env.setParallelism(DEFAULT_PARALLELISM);
Builder<String> builder = DorisSink.builder();
final DorisReadOptions.Builder readOptionBuilder =
DorisReadOptions.builder();
@@ -147,7 +162,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
public void testTableSinkJsonFormat() throws Exception {
initializeTable(TABLE_JSON_TBL);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
+ env.setParallelism(DEFAULT_PARALLELISM);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -196,7 +211,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
public void testTableBatch() throws Exception {
initializeTable(TABLE_CSV_BATCH_TBL);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
+ env.setParallelism(DEFAULT_PARALLELISM);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -244,6 +259,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
initializeTable(TABLE_CSV_BATCH_DS);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ env.setParallelism(DEFAULT_PARALLELISM);
DorisBatchSink.Builder<String> builder = DorisBatchSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
@@ -283,7 +299,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
public void testTableGroupCommit() throws Exception {
initializeTable(TABLE_GROUP_COMMIT);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
+ env.setParallelism(DEFAULT_PARALLELISM);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -332,7 +348,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
public void testTableGzFormat() throws Exception {
initializeTable(TABLE_GZ_FORMAT);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
+ env.setParallelism(DEFAULT_PARALLELISM);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -374,7 +390,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
LOG.info("start to test JobManagerFailoverSink.");
initializeFailoverTable(TABLE_CSV_JM);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
+ env.setParallelism(DEFAULT_PARALLELISM);
env.enableCheckpointing(10000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
@@ -434,7 +450,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
LOG.info("start to test TaskManagerFailoverSink.");
initializeFailoverTable(TABLE_CSV_TM);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
+ env.setParallelism(DEFAULT_PARALLELISM);
env.enableCheckpointing(10000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
@@ -486,13 +502,6 @@ public class DorisSinkITCase extends AbstractITCaseService
{
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected,
query, 2);
}
- private void sleepMs(long millis) {
- try {
- Thread.sleep(millis);
- } catch (InterruptedException ignored) {
- }
- }
-
private void initializeTable(String table) {
ContainerUtils.executeSQLStatement(
getDorisQueryConnection(),
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
index 783e6bda..96a08d1c 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
@@ -17,11 +17,16 @@
package org.apache.doris.flink.source;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
@@ -31,18 +36,17 @@ import
org.apache.doris.flink.container.AbstractITCaseService;
import org.apache.doris.flink.container.ContainerUtils;
import org.apache.doris.flink.datastream.DorisSourceFunction;
import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
-import org.apache.doris.flink.exception.DorisRuntimeException;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Properties;
-import java.util.Set;
/** DorisSource ITCase. */
public class DorisSourceITCase extends AbstractITCaseService {
@@ -56,13 +60,25 @@ public class DorisSourceITCase extends
AbstractITCaseService {
private static final String TABLE_READ_TBL_PUSH_DOWN =
"tbl_read_tbl_push_down";
private static final String TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL =
"tbl_read_tbl_push_down_with_union_all";
+ static final String TABLE_CSV_JM = "tbl_csv_jm_source";
+ static final String TABLE_CSV_TM = "tbl_csv_tm_source";
+
+ @Rule
+ public final MiniClusterWithClientResource miniClusterResource =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(2)
+ .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+ .withHaLeadershipControl()
+ .build());
@Test
public void testSource() throws Exception {
initializeTable(TABLE_READ);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
-
+ env.setParallelism(DEFAULT_PARALLELISM);
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder
.setFenodes(getFenodes())
@@ -84,13 +100,14 @@ public class DorisSourceITCase extends
AbstractITCaseService {
}
}
List<String> expected = Arrays.asList("[doris, 18]", "[flink, 10]",
"[apache, 12]");
- checkResult("testSource", expected.toArray(), actual.toArray());
+ checkResultInAnyOrder("testSource", expected.toArray(),
actual.toArray());
}
@Test
public void testOldSourceApi() throws Exception {
initializeTable(TABLE_READ_OLD_API);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
Properties properties = new Properties();
properties.put("fenodes", getFenodes());
properties.put("username", getDorisUsername());
@@ -109,14 +126,14 @@ public class DorisSourceITCase extends
AbstractITCaseService {
}
}
List<String> expected = Arrays.asList("[doris, 18]", "[flink, 10]",
"[apache, 12]");
- checkResult("testOldSourceApi", expected.toArray(), actual.toArray());
+ checkResultInAnyOrder("testOldSourceApi", expected.toArray(),
actual.toArray());
}
@Test
public void testTableSource() throws Exception {
initializeTable(TABLE_READ_TBL);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
+ env.setParallelism(DEFAULT_PARALLELISM);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -146,7 +163,7 @@ public class DorisSourceITCase extends
AbstractITCaseService {
}
}
String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]",
"+I[apache, 12]"};
- Assert.assertArrayEquals(expected, actual.toArray());
+ assertEqualsInAnyOrder(Arrays.asList(expected),
Arrays.asList(actual.toArray()));
// fitler query
List<String> actualFilter = new ArrayList<>();
@@ -158,14 +175,14 @@ public class DorisSourceITCase extends
AbstractITCaseService {
}
}
String[] expectedFilter = new String[] {"+I[doris, 18]"};
- checkResult("testTableSource", expectedFilter, actualFilter.toArray());
+ checkResultInAnyOrder("testTableSource", expectedFilter,
actualFilter.toArray());
}
@Test
public void testTableSourceOldApi() throws Exception {
initializeTable(TABLE_READ_TBL_OLD_API);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
+ env.setParallelism(DEFAULT_PARALLELISM);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String sourceDDL =
@@ -195,14 +212,14 @@ public class DorisSourceITCase extends
AbstractITCaseService {
}
}
String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]",
"+I[apache, 12]"};
- checkResult("testTableSourceOldApi", expected, actual.toArray());
+ checkResultInAnyOrder("testTableSourceOldApi", expected,
actual.toArray());
}
@Test
public void testTableSourceAllOptions() throws Exception {
initializeTable(TABLE_READ_TBL_ALL_OPTIONS);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
+ env.setParallelism(DEFAULT_PARALLELISM);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String sourceDDL =
@@ -241,14 +258,14 @@ public class DorisSourceITCase extends
AbstractITCaseService {
}
}
String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]",
"+I[apache, 12]"};
- checkResult("testTableSourceAllOptions", expected, actual.toArray());
+ checkResultInAnyOrder("testTableSourceAllOptions", expected,
actual.toArray());
}
@Test
public void testTableSourceFilterAndProjectionPushDown() throws Exception {
initializeTable(TABLE_READ_TBL_PUSH_DOWN);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
+ env.setParallelism(DEFAULT_PARALLELISM);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String sourceDDL =
@@ -279,15 +296,16 @@ public class DorisSourceITCase extends
AbstractITCaseService {
}
}
String[] expected = new String[] {"+I[18]"};
- checkResult("testTableSourceFilterAndProjectionPushDown", expected,
actual.toArray());
+ checkResultInAnyOrder(
+ "testTableSourceFilterAndProjectionPushDown", expected,
actual.toArray());
}
@Test
- public void testTableSourceFilterWithUnionAll() {
+ public void testTableSourceFilterWithUnionAll() throws Exception {
LOG.info("starting to execute testTableSourceFilterWithUnionAll
case.");
initializeTable(TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
+ env.setParallelism(DEFAULT_PARALLELISM);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String sourceDDL =
@@ -318,14 +336,134 @@ public class DorisSourceITCase extends
AbstractITCaseService {
while (iterator.hasNext()) {
actual.add(iterator.next().toString());
}
- } catch (Exception e) {
- LOG.error("Failed to execute sql. sql={}", querySql, e);
- throw new DorisRuntimeException(e);
}
- Set<String> expected = new HashSet<>(Arrays.asList("+I[flink, 10]",
"+I[doris, 18]"));
- for (String a : actual) {
- Assert.assertTrue(expected.contains(a));
+
+ String[] expected = new String[] {"+I[flink, 10]", "+I[doris, 18]"};
+ checkResultInAnyOrder("testTableSourceFilterWithUnionAll", expected,
actual.toArray());
+ }
+
+ @Test
+ public void testJobManagerFailoverSource() throws Exception {
+ LOG.info("start to test JobManagerFailoverSource.");
+ initializeTableWithData(TABLE_CSV_JM);
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ env.enableCheckpointing(200L);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE doris_source_jm ("
+ + " name STRING,"
+ + " age INT"
+ + ") WITH ("
+ + " 'connector' = 'doris',"
+ + " 'fenodes' = '%s',"
+ + " 'table.identifier' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s'"
+ + ")",
+ getFenodes(),
+ DATABASE + "." + TABLE_CSV_JM,
+ getDorisUsername(),
+ getDorisPassword());
+ tEnv.executeSql(sourceDDL);
+ TableResult tableResult = tEnv.executeSql("select * from
doris_source_jm");
+ CloseableIterator<Row> iterator = tableResult.collect();
+ JobID jobId = tableResult.getJobClient().get().getJobID();
+
+ List<String> expectedData = getExpectedData();
+ if (iterator.hasNext()) {
+ LOG.info("trigger jobmanager failover...");
+ triggerFailover(
+ FailoverType.JM,
+ jobId,
+ miniClusterResource.getMiniCluster(),
+ () -> sleepMs(100));
+ }
+ List<String> actual = fetchRows(iterator);
+ LOG.info("actual data: {}, expected: {}", actual, expectedData);
+ Assert.assertTrue(actual.size() >= expectedData.size());
+ Assert.assertTrue(actual.containsAll(expectedData));
+ }
+
+ private static List<String> getExpectedData() {
+ String[] expected =
+ new String[] {
+ "+I[101, 1]",
+ "+I[102, 1]",
+ "+I[103, 1]",
+ "+I[201, 2]",
+ "+I[202, 2]",
+ "+I[203, 2]",
+ "+I[301, 3]",
+ "+I[302, 3]",
+ "+I[303, 3]",
+ "+I[401, 4]",
+ "+I[402, 4]",
+ "+I[403, 4]",
+ "+I[501, 5]",
+ "+I[502, 5]",
+ "+I[503, 5]",
+ "+I[601, 6]",
+ "+I[602, 6]",
+ "+I[603, 6]",
+ "+I[701, 7]",
+ "+I[702, 7]",
+ "+I[703, 7]",
+ "+I[801, 8]",
+ "+I[802, 8]",
+ "+I[803, 8]",
+ "+I[901, 9]",
+ "+I[902, 9]",
+ "+I[903, 9]"
+ };
+ return Arrays.asList(expected);
+ }
+
+ @Test
+ public void testTaskManagerFailoverSource() throws Exception {
+ LOG.info("start to test TaskManagerFailoverSource.");
+ initializeTableWithData(TABLE_CSV_TM);
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ env.enableCheckpointing(200L);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE doris_source_tm ("
+ + " name STRING,"
+ + " age INT"
+ + ") WITH ("
+ + " 'connector' = 'doris',"
+ + " 'fenodes' = '%s',"
+ + " 'table.identifier' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s'"
+ + ")",
+ getFenodes(),
+ DATABASE + "." + TABLE_CSV_TM,
+ getDorisUsername(),
+ getDorisPassword());
+ tEnv.executeSql(sourceDDL);
+ TableResult tableResult = tEnv.executeSql("select * from
doris_source_tm");
+ CloseableIterator<Row> iterator = tableResult.collect();
+ JobID jobId = tableResult.getJobClient().get().getJobID();
+ List<String> expectedData = getExpectedData();
+ if (iterator.hasNext()) {
+ LOG.info("trigger taskmanager failover...");
+ triggerFailover(
+ FailoverType.TM,
+ jobId,
+ miniClusterResource.getMiniCluster(),
+ () -> sleepMs(100));
}
+
+ List<String> actual = fetchRows(iterator);
+ LOG.info("actual data: {}, expected: {}", actual, expectedData);
+ Assert.assertTrue(actual.size() >= expectedData.size());
+ Assert.assertTrue(actual.containsAll(expectedData));
}
private void checkResult(String testName, Object[] expected, Object[]
actual) {
@@ -337,6 +475,15 @@ public class DorisSourceITCase extends
AbstractITCaseService {
Assert.assertArrayEquals(expected, actual);
}
+ private void checkResultInAnyOrder(String testName, Object[] expected,
Object[] actual) {
+ LOG.info(
+ "Checking DorisSourceITCase result. testName={}, actual={},
expected={}",
+ testName,
+ actual,
+ expected);
+ assertEqualsInAnyOrder(Arrays.asList(expected), Arrays.asList(actual));
+ }
+
private void initializeTable(String table) {
ContainerUtils.executeSQLStatement(
getDorisQueryConnection(),
@@ -347,7 +494,7 @@ public class DorisSourceITCase extends
AbstractITCaseService {
"CREATE TABLE %s.%s ( \n"
+ "`name` varchar(256),\n"
+ "`age` int\n"
- + ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
+ + ") DISTRIBUTED BY HASH(`name`) BUCKETS 10\n"
+ "PROPERTIES (\n"
+ "\"replication_num\" = \"1\"\n"
+ ")\n",
@@ -356,4 +503,49 @@ public class DorisSourceITCase extends
AbstractITCaseService {
String.format("insert into %s.%s values ('flink',10)",
DATABASE, table),
String.format("insert into %s.%s values ('apache',12)",
DATABASE, table));
}
+
+ private void initializeTableWithData(String table) {
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection(),
+ LOG,
+ String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+ String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+ String.format(
+ "CREATE TABLE %s.%s ( \n"
+ + "`name` varchar(256),\n"
+ + "`age` int\n"
+ + ") DISTRIBUTED BY HASH(`name`) BUCKETS 10\n"
+ + "PROPERTIES (\n"
+ + "\"replication_num\" = \"1\"\n"
+ + ")\n",
+ DATABASE, table),
+ String.format(
+ "insert into %s.%s values
('101',1),('102',1),('103',1)", DATABASE, table),
+ String.format(
+ "insert into %s.%s values
('201',2),('202',2),('203',2)", DATABASE, table),
+ String.format(
+ "insert into %s.%s values
('301',3),('302',3),('303',3)", DATABASE, table),
+ String.format(
+ "insert into %s.%s values
('401',4),('402',4),('403',4)", DATABASE, table),
+ String.format(
+ "insert into %s.%s values
('501',5),('502',5),('503',5)", DATABASE, table),
+ String.format(
+ "insert into %s.%s values
('601',6),('602',6),('603',6)", DATABASE, table),
+ String.format(
+ "insert into %s.%s values
('701',7),('702',7),('703',7)", DATABASE, table),
+ String.format(
+ "insert into %s.%s values
('801',8),('802',8),('803',8)", DATABASE, table),
+ String.format(
+ "insert into %s.%s values
('901',9),('902',9),('903',9)",
+ DATABASE, table));
+ }
+
+ private static List<String> fetchRows(Iterator<Row> iter) {
+ List<String> rows = new ArrayList<>();
+ while (iter.hasNext()) {
+ Row row = iter.next();
+ rows.add(row.toString());
+ }
+ return rows;
+ }
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumeratorTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumeratorTest.java
new file mode 100644
index 00000000..68789015
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumeratorTest.java
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.source.enumerator;
+
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
+
+import org.apache.doris.flink.rest.PartitionDefinition;
+import org.apache.doris.flink.source.DorisSource;
+import org.apache.doris.flink.source.assigners.SimpleSplitAssigner;
+import org.apache.doris.flink.source.split.DorisSourceSplit;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for the {@link DorisSourceEnumerator}. */
+public class DorisSourceEnumeratorTest {
+ private static long splitId = 1L;
+ private TestingSplitEnumeratorContext<DorisSourceSplit> context;
+ private DorisSourceSplit split;
+ private DorisSourceEnumerator enumerator;
+
+ @BeforeEach
+ void setup() {
+ this.context = new TestingSplitEnumeratorContext<>(2);
+ this.split = createRandomSplit();
+ this.enumerator = createEnumerator(context, split);
+ }
+
+ @Test
+ void testCheckpointNoSplitRequested() throws Exception {
+ PendingSplitsCheckpoint state = enumerator.snapshotState(1L);
+ assertThat(state.getSplits()).contains(split);
+ }
+
+ @Test
+ void testRestoreEnumerator() throws Exception {
+ PendingSplitsCheckpoint state = enumerator.snapshotState(1L);
+
+ DorisSource<String> source = DorisSource.<String>builder().build();
+ SplitEnumerator<DorisSourceSplit, PendingSplitsCheckpoint>
restoreEnumerator =
+ source.restoreEnumerator(context, state);
+ PendingSplitsCheckpoint pendingSplitsCheckpoint =
restoreEnumerator.snapshotState(1L);
+ assertThat(pendingSplitsCheckpoint.getSplits()).contains(split);
+ }
+
+ @Test
+ void testSplitRequestForRegisteredReader() throws Exception {
+ context.registerReader(1, "somehost");
+ enumerator.addReader(1);
+ enumerator.handleSplitRequest(1, "somehost");
+ assertThat(enumerator.snapshotState(1L).getSplits()).isEmpty();
+
assertThat(context.getSplitAssignments().get(1).getAssignedSplits()).contains(split);
+ }
+
+ @Test
+ void testSplitRequestForNonRegisteredReader() throws Exception {
+ enumerator.handleSplitRequest(1, "somehost");
+ assertThat(context.getSplitAssignments()).doesNotContainKey(1);
+ assertThat(enumerator.snapshotState(1L).getSplits()).contains(split);
+ }
+
+ @Test
+ void testNoMoreSplits() {
+ // first split assignment
+ context.registerReader(1, "somehost");
+ enumerator.addReader(1);
+ enumerator.handleSplitRequest(1, "somehost");
+
+ // second request has no more split
+ enumerator.handleSplitRequest(1, "somehost");
+
+
assertThat(context.getSplitAssignments().get(1).getAssignedSplits()).contains(split);
+
assertThat(context.getSplitAssignments().get(1).hasReceivedNoMoreSplitsSignal()).isTrue();
+ }
+
+ private static DorisSourceSplit createRandomSplit() {
+ Set<Long> tabletIds = new HashSet<>();
+ tabletIds.add(1001L);
+ return new DorisSourceSplit(
+ String.valueOf(splitId),
+ new PartitionDefinition("db", "tbl", "127.0.0.1", tabletIds,
"queryPlan"));
+ }
+
+ private static DorisSourceEnumerator createEnumerator(
+ final SplitEnumeratorContext<DorisSourceSplit> context,
+ final DorisSourceSplit... splits) {
+ return new DorisSourceEnumerator(context, new
SimpleSplitAssigner(Arrays.asList(splits)));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]