This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-3.1
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit a18cacec34e4b2b9ca15c1bfd9998694eed46da3
Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com>
AuthorDate: Wed Jun 5 12:09:46 2024 +0800

    [FLINK-35092][cdc][starrocks] Add starrocks integration test cases
---
 .../flink-cdc-pipeline-connector-doris/pom.xml     |  12 +
 .../flink-cdc-pipeline-connector-starrocks/pom.xml |  18 +
 .../org/apache/commons/compress/utils/Lists.java   |  31 ++
 .../sink/StarRocksMetadataApplierITCase.java       | 388 +++++++++++++++++++++
 .../starrocks/sink/StarRocksPipelineITCase.java    | 178 ++++++++++
 .../starrocks/sink/utils/StarRocksContainer.java   | 113 ++++++
 .../sink/utils/StarRocksSinkTestBase.java          | 252 +++++++++++++
 .../flink-cdc-pipeline-e2e-tests/pom.xml           |   7 +-
 8 files changed, 998 insertions(+), 1 deletion(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
index ed166767c..9c3c92ecb 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
@@ -114,6 +114,18 @@ limitations under the License.
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>test-jar</id>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 </project>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/pom.xml
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/pom.xml
index 342044827..e410ad227 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/pom.xml
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/pom.xml
@@ -52,6 +52,24 @@ limitations under the License.
             <artifactId>flink-cdc-composer</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>jdbc</artifactId>
+            <version>1.18.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils-junit</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/starrocks/shade/org/apache/commons/compress/utils/Lists.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/starrocks/shade/org/apache/commons/compress/utils/Lists.java
new file mode 100644
index 000000000..d029943e3
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/starrocks/shade/org/apache/commons/compress/utils/Lists.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.starrocks.shade.org.apache.commons.compress.utils;
+
+import java.util.ArrayList;
+
+/**
+ * Dummy class of shaded apache-commons since connector 1.2.9 depends on this, 
but not package it.
+ * This package should be removed after upgrading to 1.2.10 which will not use 
commons-compress
+ * anymore.
+ */
+public class Lists {
+    public static <E> ArrayList<E> newArrayList() {
+        return new ArrayList<>();
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
new file mode 100644
index 000000000..c294dd423
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.starrocks.sink;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DropColumnEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.RenameColumnEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.common.schema.PhysicalColumn;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.sink.DataSink;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.composer.definition.SinkDef;
+import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator;
+import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator;
+import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator;
+import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksContainer;
+import 
org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksSinkTestBase;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT;
+import static 
org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.JDBC_URL;
+import static 
org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.LOAD_URL;
+import static 
org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.PASSWORD;
+import static 
org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.USERNAME;
+
+/** IT tests for {@link StarRocksMetadataApplier}. */
+public class StarRocksMetadataApplierITCase extends StarRocksSinkTestBase {
+    private static final StreamExecutionEnvironment env =
+            StreamExecutionEnvironment.getExecutionEnvironment();
+
+    @BeforeClass
+    public static void before() {
+        env.setParallelism(DEFAULT_PARALLELISM);
+        env.enableCheckpointing(3000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+    }
+
+    @Before
+    public void initializeDatabase() {
+        executeSql(
+                String.format(
+                        "CREATE DATABASE IF NOT EXISTS `%s`;",
+                        StarRocksContainer.STARROCKS_DATABASE_NAME));
+        LOG.info("Database {} created.", 
StarRocksContainer.STARROCKS_DATABASE_NAME);
+    }
+
+    @After
+    public void destroyDatabase() {
+        executeSql(String.format("DROP DATABASE %s;", 
StarRocksContainer.STARROCKS_DATABASE_NAME));
+        LOG.info("Database {} destroyed.", 
StarRocksContainer.STARROCKS_DATABASE_NAME);
+    }
+
+    private List<Event> generateAddColumnEvents(TableId tableId) {
+        Schema schema =
+                Schema.newBuilder()
+                        .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), null))
+                        .column(new PhysicalColumn("number", 
DataTypes.DOUBLE(), null))
+                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(17), null))
+                        .primaryKey("id")
+                        .build();
+
+        return Arrays.asList(
+                new CreateTableEvent(tableId, schema),
+                new AddColumnEvent(
+                        tableId,
+                        Collections.singletonList(
+                                new AddColumnEvent.ColumnWithPosition(
+                                        new PhysicalColumn("extra_date", 
DataTypes.DATE(), null)))),
+                new AddColumnEvent(
+                        tableId,
+                        Collections.singletonList(
+                                new AddColumnEvent.ColumnWithPosition(
+                                        new PhysicalColumn(
+                                                "extra_bool", 
DataTypes.BOOLEAN(), null)))),
+                new AddColumnEvent(
+                        tableId,
+                        Collections.singletonList(
+                                new AddColumnEvent.ColumnWithPosition(
+                                        new PhysicalColumn(
+                                                "extra_decimal",
+                                                DataTypes.DECIMAL(17, 0),
+                                                null)))));
+    }
+
+    private List<Event> generateDropColumnEvents(TableId tableId) {
+        Schema schema =
+                Schema.newBuilder()
+                        .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), null))
+                        .column(new PhysicalColumn("number", 
DataTypes.DOUBLE(), null))
+                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(17), null))
+                        .primaryKey("id")
+                        .build();
+
+        return Arrays.asList(
+                new CreateTableEvent(tableId, schema),
+                new DropColumnEvent(tableId, 
Collections.singletonList("number")));
+    }
+
+    private List<Event> generateRenameColumnEvents(TableId tableId) {
+        Schema schema =
+                Schema.newBuilder()
+                        .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), null))
+                        .column(new PhysicalColumn("number", 
DataTypes.DOUBLE(), null))
+                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(17), null))
+                        .primaryKey("id")
+                        .build();
+
+        return Arrays.asList(
+                new CreateTableEvent(tableId, schema),
+                new RenameColumnEvent(tableId, 
Collections.singletonMap("number", "kazu")),
+                new RenameColumnEvent(tableId, 
Collections.singletonMap("name", "namae")));
+    }
+
+    private List<Event> generateAlterColumnTypeEvents(TableId tableId) {
+        Schema schema =
+                Schema.newBuilder()
+                        .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), null))
+                        .column(new PhysicalColumn("number", 
DataTypes.DOUBLE(), null))
+                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(17), null))
+                        .primaryKey("id")
+                        .build();
+
+        return Arrays.asList(
+                new CreateTableEvent(tableId, schema),
+                new AlterColumnTypeEvent(
+                        tableId, Collections.singletonMap("name", 
DataTypes.VARCHAR(19))));
+    }
+
+    private List<Event> generateNarrowingAlterColumnTypeEvents(TableId 
tableId) {
+        Schema schema =
+                Schema.newBuilder()
+                        .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), null))
+                        .column(new PhysicalColumn("number", 
DataTypes.DOUBLE(), null))
+                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(17), null))
+                        .primaryKey("id")
+                        .build();
+
+        return Arrays.asList(
+                new CreateTableEvent(tableId, schema),
+                // Double -> Float is a narrowing cast, should fail
+                new AlterColumnTypeEvent(
+                        tableId, Collections.singletonMap("number", 
DataTypes.FLOAT())));
+    }
+
+    @Test
+    public void testStarRocksDataType() throws Exception {
+        TableId tableId =
+                TableId.tableId(
+                        StarRocksContainer.STARROCKS_DATABASE_NAME,
+                        StarRocksContainer.STARROCKS_TABLE_NAME);
+        Schema schema =
+                Schema.newBuilder()
+                        .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), "ID"))
+                        // StarRocks sink doesn't support BINARY and BYTES 
type yet.
+                        // .column(new PhysicalColumn("binary", 
DataTypes.BINARY(17), "Binary"))
+                        // .column(new PhysicalColumn("varbinary", 
DataTypes.VARBINARY(17), "Var
+                        // Binary"))
+                        // .column(new PhysicalColumn("bytes", 
DataTypes.BYTES(), "Bytes"))
+                        .column(new PhysicalColumn("boolean", 
DataTypes.BOOLEAN(), "Boolean"))
+                        .column(new PhysicalColumn("int", DataTypes.INT(), 
"Int"))
+                        .column(new PhysicalColumn("tinyint", 
DataTypes.TINYINT(), "Tiny Int"))
+                        .column(new PhysicalColumn("smallint", 
DataTypes.SMALLINT(), "Small Int"))
+                        .column(new PhysicalColumn("float", DataTypes.FLOAT(), 
"Float"))
+                        .column(new PhysicalColumn("double", 
DataTypes.DOUBLE(), "Double"))
+                        .column(new PhysicalColumn("char", DataTypes.CHAR(17), 
"Char"))
+                        .column(new PhysicalColumn("varchar", 
DataTypes.VARCHAR(17), "Var Char"))
+                        .column(new PhysicalColumn("string", 
DataTypes.STRING(), "String"))
+                        .column(new PhysicalColumn("decimal", 
DataTypes.DECIMAL(17, 7), "Decimal"))
+                        .column(new PhysicalColumn("date", DataTypes.DATE(), 
"Date"))
+                        // StarRocks sink doesn't support TIME type yet.
+                        // .column(new PhysicalColumn("time", 
DataTypes.TIME(), "Time"))
+                        // .column(new PhysicalColumn("time_3", 
DataTypes.TIME(3), "Time With
+                        // Precision"))
+                        .column(new PhysicalColumn("timestamp", 
DataTypes.TIMESTAMP(), "Timestamp"))
+                        .column(
+                                new PhysicalColumn(
+                                        "timestamp_3",
+                                        DataTypes.TIMESTAMP(3),
+                                        "Timestamp With Precision"))
+                        // StarRocks sink doesn't support TIMESTAMP with 
non-local TZ yet.
+                        // .column(new PhysicalColumn("timestamptz", 
DataTypes.TIMESTAMP_TZ(),
+                        // "TimestampTZ"))
+                        // .column(new PhysicalColumn("timestamptz_3", 
DataTypes.TIMESTAMP_TZ(3),
+                        // "TimestampTZ With Precision"))
+                        .column(
+                                new PhysicalColumn(
+                                        "timestampltz", 
DataTypes.TIMESTAMP_LTZ(), "TimestampLTZ"))
+                        .column(
+                                new PhysicalColumn(
+                                        "timestampltz_3",
+                                        DataTypes.TIMESTAMP_LTZ(3),
+                                        "TimestampLTZ With Precision"))
+                        .primaryKey("id")
+                        .build();
+
+        runJobWithEvents(Collections.singletonList(new 
CreateTableEvent(tableId, schema)));
+
+        List<String> actual = inspectTableSchema(tableId);
+        List<String> expected =
+                Arrays.asList(
+                        "id | int | NO | true | null",
+                        "boolean | boolean | YES | false | null",
+                        "int | int | YES | false | null",
+                        "tinyint | tinyint | YES | false | null",
+                        "smallint | smallint | YES | false | null",
+                        "float | float | YES | false | null",
+                        "double | double | YES | false | null",
+                        "char | char(51) | YES | false | null",
+                        "varchar | varchar(51) | YES | false | null",
+                        "string | varchar(1048576) | YES | false | null",
+                        "decimal | decimal(17,7) | YES | false | null",
+                        "date | date | YES | false | null",
+                        "timestamp | datetime | YES | false | null",
+                        "timestamp_3 | datetime | YES | false | null",
+                        "timestampltz | datetime | YES | false | null",
+                        "timestampltz_3 | datetime | YES | false | null");
+
+        assertEqualsInOrder(expected, actual);
+    }
+
+    @Test
+    public void testStarRocksAddColumn() throws Exception {
+        TableId tableId =
+                TableId.tableId(
+                        StarRocksContainer.STARROCKS_DATABASE_NAME,
+                        StarRocksContainer.STARROCKS_TABLE_NAME);
+
+        runJobWithEvents(generateAddColumnEvents(tableId));
+
+        List<String> actual = inspectTableSchema(tableId);
+
+        List<String> expected =
+                Arrays.asList(
+                        "id | int | NO | true | null",
+                        "number | double | YES | false | null",
+                        "name | varchar(51) | YES | false | null",
+                        "extra_date | date | YES | false | null",
+                        "extra_bool | boolean | YES | false | null",
+                        "extra_decimal | decimal(17,0) | YES | false | null");
+
+        assertEqualsInOrder(expected, actual);
+    }
+
+    @Test
+    public void testStarRocksDropColumn() throws Exception {
+        TableId tableId =
+                TableId.tableId(
+                        StarRocksContainer.STARROCKS_DATABASE_NAME,
+                        StarRocksContainer.STARROCKS_TABLE_NAME);
+
+        runJobWithEvents(generateDropColumnEvents(tableId));
+
+        List<String> actual = inspectTableSchema(tableId);
+
+        List<String> expected =
+                Arrays.asList(
+                        "id | int | NO | true | null", "name | varchar(51) | 
YES | false | null");
+
+        assertEqualsInOrder(expected, actual);
+    }
+
+    @Test
+    @Ignore("Rename column is not supported currently.")
+    public void testStarRocksRenameColumn() throws Exception {
+        TableId tableId =
+                TableId.tableId(
+                        StarRocksContainer.STARROCKS_DATABASE_NAME,
+                        StarRocksContainer.STARROCKS_TABLE_NAME);
+
+        runJobWithEvents(generateRenameColumnEvents(tableId));
+
+        List<String> actual = inspectTableSchema(tableId);
+
+        List<String> expected =
+                Arrays.asList(
+                        "id | int | NO | true | null",
+                        "kazu | double | YES | false | null",
+                        "namae | varchar(51) | YES | false | null");
+
+        assertEqualsInOrder(expected, actual);
+    }
+
+    @Test
+    @Ignore("Alter column type is not supported currently.")
+    public void testStarRocksAlterColumnType() throws Exception {
+        TableId tableId =
+                TableId.tableId(
+                        StarRocksContainer.STARROCKS_DATABASE_NAME,
+                        StarRocksContainer.STARROCKS_TABLE_NAME);
+
+        runJobWithEvents(generateAlterColumnTypeEvents(tableId));
+
+        List<String> actual = inspectTableSchema(tableId);
+
+        List<String> expected =
+                Arrays.asList(
+                        "id | int | NO | true | null",
+                        "number | double | YES | false | null",
+                        "name | varchar(57) | YES | false | null");
+
+        assertEqualsInOrder(expected, actual);
+    }
+
+    @Test(expected = JobExecutionException.class)
+    @Ignore("Alter column type is not supported currently.")
+    public void testStarRocksNarrowingAlterColumnType() throws Exception {
+        TableId tableId =
+                TableId.tableId(
+                        StarRocksContainer.STARROCKS_DATABASE_NAME,
+                        StarRocksContainer.STARROCKS_TABLE_NAME);
+
+        runJobWithEvents(generateNarrowingAlterColumnTypeEvents(tableId));
+    }
+
+    private void runJobWithEvents(List<Event> events) throws Exception {
+        DataStream<Event> stream = env.fromCollection(events, 
TypeInformation.of(Event.class));
+
+        Configuration config =
+                new Configuration()
+                        .set(LOAD_URL, STARROCKS_CONTAINER.getLoadUrl())
+                        .set(JDBC_URL, STARROCKS_CONTAINER.getJdbcUrl())
+                        .set(USERNAME, StarRocksContainer.STARROCKS_USERNAME)
+                        .set(PASSWORD, StarRocksContainer.STARROCKS_PASSWORD);
+
+        DataSink starRocksSink = createStarRocksDataSink(config);
+
+        SchemaOperatorTranslator schemaOperatorTranslator =
+                new SchemaOperatorTranslator(
+                        SchemaChangeBehavior.EVOLVE,
+                        "$$_schema_operator_$$",
+                        DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT);
+
+        OperatorIDGenerator schemaOperatorIDGenerator =
+                new 
OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());
+
+        stream =
+                schemaOperatorTranslator.translate(
+                        stream,
+                        DEFAULT_PARALLELISM,
+                        starRocksSink.getMetadataApplier(),
+                        new ArrayList<>());
+
+        DataSinkTranslator sinkTranslator = new DataSinkTranslator();
+        sinkTranslator.translate(
+                new SinkDef("starrocks", "Dummy StarRocks Sink", config),
+                stream,
+                starRocksSink,
+                schemaOperatorIDGenerator.generate());
+
+        env.execute("StarRocks Schema Evolution Test");
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksPipelineITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksPipelineITCase.java
new file mode 100644
index 000000000..43c1faaac
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksPipelineITCase.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.starrocks.sink;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.PhysicalColumn;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksContainer;
+import 
org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksSinkTestBase;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.JDBC_URL;
+import static 
org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.LOAD_URL;
+import static 
org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.PASSWORD;
+import static 
org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.USERNAME;
+
+/** IT tests for {@link StarRocksDataSink}. */
+public class StarRocksPipelineITCase extends StarRocksSinkTestBase {
+    private static final StreamExecutionEnvironment env =
+            StreamExecutionEnvironment.getExecutionEnvironment();
+
+    @BeforeClass
+    public static void before() {
+        env.setParallelism(DEFAULT_PARALLELISM);
+        env.enableCheckpointing(3000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+    }
+
+    @Before
+    public void initializeDatabaseAndTable() {
+        executeSql(
+                String.format(
+                        "CREATE DATABASE IF NOT EXISTS `%s`;",
+                        StarRocksContainer.STARROCKS_DATABASE_NAME));
+
+        LOG.info("Database {} created.", 
StarRocksContainer.STARROCKS_DATABASE_NAME);
+
+        List<String> schema = Arrays.asList("id INT NOT NULL", "number 
DOUBLE", "name VARCHAR(51)");
+
+        executeSql(
+                String.format(
+                        "CREATE TABLE `%s`.`%s` (%s) PRIMARY KEY (`%s`) 
DISTRIBUTED BY HASH(`%s`) BUCKETS 1 PROPERTIES (\"replication_num\" = \"1\");",
+                        StarRocksContainer.STARROCKS_DATABASE_NAME,
+                        StarRocksContainer.STARROCKS_TABLE_NAME,
+                        String.join(", ", schema),
+                        "id",
+                        "id"));
+
+        LOG.info("Table {} created.", StarRocksContainer.STARROCKS_TABLE_NAME);
+    }
+
+    @After
+    public void destroyDatabaseAndTable() {
+
+        executeSql(
+                String.format(
+                        "DROP TABLE %s.%s;",
+                        StarRocksContainer.STARROCKS_DATABASE_NAME,
+                        StarRocksContainer.STARROCKS_TABLE_NAME));
+
+        LOG.info("Table {} destroyed.", 
StarRocksContainer.STARROCKS_TABLE_NAME);
+
+        executeSql(String.format("DROP DATABASE %s;", 
StarRocksContainer.STARROCKS_DATABASE_NAME));
+
+        LOG.info("Database {} destroyed.", 
StarRocksContainer.STARROCKS_DATABASE_NAME);
+    }
+
+    private List<Event> generateEvents(TableId tableId) {
+        Schema schema =
+                Schema.newBuilder()
+                        .column(new PhysicalColumn("id", DataTypes.INT(), 
null))
+                        .column(new PhysicalColumn("number", 
DataTypes.DOUBLE(), null))
+                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(17), null))
+                        .primaryKey("id")
+                        .build();
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(
+                        RowType.of(DataTypes.INT(), DataTypes.DOUBLE(), 
DataTypes.VARCHAR(17)));
+
+        return Arrays.asList(
+                new CreateTableEvent(tableId, schema),
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {17, 3.14, 
BinaryStringData.fromString("StarRocks")})),
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    19, 2.718, 
BinaryStringData.fromString("Que Sera Sera")
+                                })),
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    21, 1.732, 
BinaryStringData.fromString("Disenchanted")
+                                })),
+                DataChangeEvent.deleteEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    19, 2.718, 
BinaryStringData.fromString("Que Sera Sera")
+                                })),
+                DataChangeEvent.updateEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {17, 3.14, 
BinaryStringData.fromString("StarRocks")}),
+                        generator.generate(
+                                new Object[] {
+                                    17, 6.28, 
BinaryStringData.fromString("StarRocks")
+                                })));
+    }
+
+    @Test
+    public void testValuesToStarRocks() throws Exception {
+        TableId tableId =
+                TableId.tableId(
+                        StarRocksContainer.STARROCKS_DATABASE_NAME,
+                        StarRocksContainer.STARROCKS_TABLE_NAME);
+        DataStream<Event> stream =
+                env.fromCollection(generateEvents(tableId), 
TypeInformation.of(Event.class));
+
+        Configuration config =
+                new Configuration()
+                        .set(LOAD_URL, STARROCKS_CONTAINER.getLoadUrl())
+                        .set(JDBC_URL, STARROCKS_CONTAINER.getJdbcUrl())
+                        .set(USERNAME, StarRocksContainer.STARROCKS_USERNAME)
+                        .set(PASSWORD, StarRocksContainer.STARROCKS_PASSWORD);
+
+        Sink<Event> starRocksSink =
+                ((FlinkSinkProvider) 
createStarRocksDataSink(config).getEventSinkProvider())
+                        .getSink();
+        stream.sinkTo(starRocksSink);
+
+        env.execute("Values to StarRocks Sink");
+
+        List<String> actual = fetchTableContent(tableId, 3);
+        List<String> expected = Arrays.asList("17 | 6.28 | StarRocks", "21 | 
1.732 | Disenchanted");
+
+        assertEqualsInAnyOrder(expected, actual);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksContainer.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksContainer.java
new file mode 100644
index 000000000..8bba7053e
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksContainer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.starrocks.sink.utils;
+
+import org.junit.ClassRule;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/** Docker container for StarRocks. */
+public class StarRocksContainer extends 
JdbcDatabaseContainer<StarRocksContainer> {
+
+    private static final String DOCKER_IMAGE_NAME = 
"starrocks/allin1-ubuntu:3.2.6";
+
+    // exposed ports
+    public static final int FE_HTTP_SERVICE_PORT = 8080;
+    public static final int FE_QUERY_PORT = 9030;
+
+    public static final String STARROCKS_DATABASE_NAME = "starrocks_database";
+    public static final String STARROCKS_TABLE_NAME = "fallen_angel";
+    public static final String STARROCKS_USERNAME = "root";
+    public static final String STARROCKS_PASSWORD = "";
+
+    @ClassRule public static final Network NETWORK = Network.newNetwork();
+
+    public StarRocksContainer() {
+        super(DockerImageName.parse(DOCKER_IMAGE_NAME));
+        setExposedPorts(Arrays.asList(FE_HTTP_SERVICE_PORT, FE_QUERY_PORT));
+        setNetwork(NETWORK);
+    }
+
+    public StarRocksContainer(Network network) {
+        super(DockerImageName.parse(DOCKER_IMAGE_NAME));
+        setExposedPorts(Arrays.asList(FE_HTTP_SERVICE_PORT, FE_QUERY_PORT));
+        setNetwork(network);
+    }
+
+    public List<String> getLoadUrl() {
+        return Collections.singletonList(
+                String.format("%s:%d", getHost(), 
getMappedPort(FE_HTTP_SERVICE_PORT)));
+    }
+
+    public void waitForLog(String regex, int count, int timeoutSeconds) {
+        new LogMessageWaitStrategy()
+                .withRegEx(regex)
+                .withTimes(count)
+                .withStartupTimeout(Duration.of(timeoutSeconds, 
ChronoUnit.SECONDS))
+                .waitUntilReady(this);
+    }
+
+    @Override
+    public String getDriverClassName() {
+        try {
+            Class.forName("com.mysql.cj.jdbc.Driver");
+            return "com.mysql.cj.jdbc.Driver";
+        } catch (ClassNotFoundException e) {
+            return "com.mysql.jdbc.Driver";
+        }
+    }
+
+    @Override
+    public String getJdbcUrl() {
+        return getJdbcUrl("");
+    }
+
+    public String getJdbcUrl(String databaseName) {
+        String additionalUrlParams = constructUrlParameters("?", "&");
+        return "jdbc:mysql://"
+                + getHost()
+                + ":"
+                + getMappedPort(FE_QUERY_PORT)
+                + "/"
+                + databaseName
+                + additionalUrlParams;
+    }
+
+    @Override
+    public String getUsername() {
+        return STARROCKS_USERNAME;
+    }
+
+    @Override
+    public String getPassword() {
+        return STARROCKS_PASSWORD;
+    }
+
+    @Override
+    protected String getTestQueryString() {
+        return "SELECT 1";
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase.java
new file mode 100644
index 000000000..4980c603b
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.starrocks.sink.utils;
+
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.factories.Factory;
+import org.apache.flink.cdc.common.sink.DataSink;
+import org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSink;
+import org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkFactory;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Basic class for testing {@link StarRocksDataSink}. */
+public class StarRocksSinkTestBase extends TestLogger {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(StarRocksSinkTestBase.class);
+
+    protected static final int DEFAULT_PARALLELISM = 1;
+
+    protected static final StarRocksContainer STARROCKS_CONTAINER = 
createStarRocksContainer();
+
+    public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240;
+
+    private static StarRocksContainer createStarRocksContainer() {
+        return new StarRocksContainer();
+    }
+
+    @Rule
+    public final MiniClusterWithClientResource miniClusterResource =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+                            .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                            .withHaLeadershipControl()
+                            .build());
+
+    @BeforeClass
+    public static void startContainers() {
+        LOG.info("Starting containers...");
+        Startables.deepStart(Stream.of(STARROCKS_CONTAINER)).join();
+        LOG.info("Waiting for StarRocks to launch");
+
+        long startWaitingTimestamp = System.currentTimeMillis();
+
+        new LogMessageWaitStrategy()
+                .withRegEx(".*Enjoy the journal to StarRocks blazing-fast 
lake-house engine!.*\\s")
+                .withTimes(1)
+                .withStartupTimeout(
+                        Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, 
ChronoUnit.SECONDS))
+                .waitUntilReady(STARROCKS_CONTAINER);
+
+        while (!checkBackendAvailability()) {
+            try {
+                if (System.currentTimeMillis() - startWaitingTimestamp
+                        > DEFAULT_STARTUP_TIMEOUT_SECONDS * 1000) {
+                    throw new RuntimeException("StarRocks backend startup 
timed out.");
+                }
+                LOG.info("Waiting for backends to be available");
+                Thread.sleep(1000);
+            } catch (InterruptedException ignored) {
+                // ignore and check next round
+            }
+        }
+        LOG.info("Containers are started.");
+    }
+
+    @AfterClass
+    public static void stopContainers() {
+        LOG.info("Stopping containers...");
+        STARROCKS_CONTAINER.stop();
+        LOG.info("Containers are stopped.");
+    }
+
+    static class MockContext implements Factory.Context {
+
+        Configuration factoryConfiguration;
+
+        public MockContext(Configuration factoryConfiguration) {
+            this.factoryConfiguration = factoryConfiguration;
+        }
+
+        @Override
+        public Configuration getFactoryConfiguration() {
+            return factoryConfiguration;
+        }
+
+        @Override
+        public Configuration getPipelineConfiguration() {
+            return 
Configuration.fromMap(Collections.singletonMap("local-time-zone", "UTC"));
+        }
+
+        @Override
+        public ClassLoader getClassLoader() {
+            return null;
+        }
+    }
+
+    public static DataSink createStarRocksDataSink(Configuration 
factoryConfiguration) {
+        StarRocksDataSinkFactory factory = new StarRocksDataSinkFactory();
+        return factory.createDataSink(new MockContext(factoryConfiguration));
+    }
+
+    public static void executeSql(String sql) {
+        try {
+            Container.ExecResult rs =
+                    STARROCKS_CONTAINER.execInContainer(
+                            "mysql",
+                            "--protocol=TCP",
+                            "-uroot",
+                            "-P9030",
+                            "-h127.0.0.1",
+                            "-e " + sql);
+
+            if (rs.getExitCode() != 0) {
+                throw new RuntimeException("Failed to execute SQL." + 
rs.getStderr());
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to execute SQL.", e);
+        }
+    }
+
+    public static boolean checkBackendAvailability() {
+        try {
+            Container.ExecResult rs =
+                    STARROCKS_CONTAINER.execInContainer(
+                            "mysql",
+                            "--protocol=TCP",
+                            "-uroot",
+                            "-P9030",
+                            "-h127.0.0.1",
+                            "-e SHOW BACKENDS\\G");
+
+            if (rs.getExitCode() != 0) {
+                return false;
+            }
+            return rs.getStdout()
+                    .contains("*************************** 1. row 
***************************");
+        } catch (Exception e) {
+            LOG.info("Failed to check backend status.", e);
+            return false;
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  test utilities
+    // ------------------------------------------------------------------------
+
+    public List<String> inspectTableSchema(TableId tableId) throws 
SQLException {
+        List<String> results = new ArrayList<>();
+        ResultSet rs =
+                STARROCKS_CONTAINER
+                        .createConnection("")
+                        .createStatement()
+                        .executeQuery(
+                                String.format(
+                                        "DESCRIBE `%s`.`%s`",
+                                        tableId.getSchemaName(), 
tableId.getTableName()));
+
+        while (rs.next()) {
+            List<String> columns = new ArrayList<>();
+            for (int i = 1; i <= 5; i++) {
+                columns.add(rs.getString(i));
+            }
+            results.add(String.join(" | ", columns));
+        }
+        return results;
+    }
+
+    public List<String> fetchTableContent(TableId tableId, int columnCount) 
throws SQLException {
+        List<String> results = new ArrayList<>();
+        ResultSet rs =
+                STARROCKS_CONTAINER
+                        .createConnection("")
+                        .createStatement()
+                        .executeQuery(
+                                String.format(
+                                        "SELECT * FROM %s.%s",
+                                        tableId.getSchemaName(), 
tableId.getTableName()));
+
+        while (rs.next()) {
+            List<String> columns = new ArrayList<>();
+            for (int i = 1; i <= columnCount; i++) {
+                columns.add(rs.getString(i));
+            }
+            results.add(String.join(" | ", columns));
+        }
+        return results;
+    }
+
+    public static void assertEqualsInAnyOrder(List<String> expected, 
List<String> actual) {
+        assertTrue(expected != null && actual != null);
+        assertEqualsInOrder(
+                expected.stream().sorted().collect(Collectors.toList()),
+                actual.stream().sorted().collect(Collectors.toList()));
+    }
+
+    public static void assertEqualsInOrder(List<String> expected, List<String> 
actual) {
+        assertTrue(expected != null && actual != null);
+        assertEquals(expected.size(), actual.size());
+        assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new 
String[0]));
+    }
+
+    public static void assertMapEquals(Map<String, ?> expected, Map<String, ?> 
actual) {
+        assertTrue(expected != null && actual != null);
+        assertEquals(expected.size(), actual.size());
+        for (String key : expected.keySet()) {
+            assertEquals(expected.get(key), actual.get(key));
+        }
+    }
+}
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
index 2326240b6..3b3e60353 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
@@ -85,6 +85,12 @@ limitations under the License.
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-cdc-pipeline-connector-doris</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-cdc-pipeline-connector-doris</artifactId>
@@ -96,7 +102,6 @@ limitations under the License.
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-cdc-pipeline-connector-starrocks</artifactId>
             <version>${project.version}</version>
-            <type>test-jar</type>
             <scope>test</scope>
         </dependency>
         <dependency>

Reply via email to