(doris-spark-connector) branch master updated: [feature] support not equals and like and not like filter push down (#219)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new 3901055 [feature] support not equals and like and not like filter push down (#219) 3901055 is described below commit 390105505a09c9bdc2073094239b629c5d7ce61c Author: gnehil AuthorDate: Mon Jul 29 16:24:17 2024 +0800 [feature] support not equals and like and not like filter push down (#219) --- .../scala/org/apache/doris/spark/sql/Utils.scala| 7 +++ .../org/apache/doris/spark/sql/TestUtils.scala | 21 ++--- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala index 7cffbe5..0400b04 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala @@ -51,6 +51,7 @@ private[spark] object Utils { def compileFilter(filter: Filter, dialect: JdbcDialect, inValueLengthLimit: Int): Option[String] = { Option(filter match { case EqualTo(attribute, value) => s"${quote(attribute)} = ${compileValue(value)}" + case Not(EqualTo(attribute, value)) => s"${quote(attribute)} != ${compileValue(value)}" case GreaterThan(attribute, value) => s"${quote(attribute)} > ${compileValue(value)}" case GreaterThanOrEqual(attribute, value) => s"${quote(attribute)} >= ${compileValue(value)}" case LessThan(attribute, value) => s"${quote(attribute)} < ${compileValue(value)}" @@ -83,6 +84,12 @@ private[spark] object Utils { } else { null } + case StringContains(attribute, value) => s"${quote(attribute)} like '%$value%'" + case Not(StringContains(attribute, value)) => s"${quote(attribute)} not like '%$value%'" + case StringEndsWith(attribute, value) => s"${quote(attribute)} like '%$value'" + case Not(StringEndsWith(attribute, value)) => s"${quote(attribute)} not like '%$value'" + case StringStartsWith(attribute, value) => s"${quote(attribute)} like '$value%'" + case Not(StringStartsWith(attribute, value)) => s"${quote(attribute)} not like '$value%'" case _ => null }) } diff --git a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestUtils.scala b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestUtils.scala index b1affbf..7e7919a 100644 --- a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestUtils.scala +++ b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestUtils.scala @@ -34,6 +34,7 @@ class TestUtils extends ExpectedExceptionTest { val inValueLengthLimit = 5 val equalFilter = EqualTo("left", 5) +val notEqualFilter = Not(EqualTo("left", 5)) val greaterThanFilter = GreaterThan("left", 5) val greaterThanOrEqualFilter = GreaterThanOrEqual("left", 5) val lessThanFilter = LessThan("left", 5) @@ -41,15 +42,22 @@ class TestUtils extends ExpectedExceptionTest { val validInFilter = In("left", Array(1, 2, 3, 4)) val emptyInFilter = In("left", Array.empty) val invalidInFilter = In("left", Array(1, 2, 3, 4, 5)) +val notInFilter = Not(In("left", Array(1, 2, 3))) val isNullFilter = IsNull("left") val isNotNullFilter = IsNotNull("left") -val notSupportFilter = StringContains("left", "right") val validAndFilter = And(equalFilter, greaterThanFilter) -val invalidAndFilter = And(equalFilter, notSupportFilter) +val invalidAndFilter = And(equalFilter, invalidInFilter) val validOrFilter = Or(equalFilter, greaterThanFilter) -val invalidOrFilter = Or(equalFilter, notSupportFilter) +val invalidOrFilter = Or(equalFilter, invalidInFilter) +val stringContainsFilter = StringContains("left", "right") +val notStringContainsFilter = Not(StringContains("left", "right")) +val stringEndsWithFilter = StringEndsWith("left", "right") +val notStringEndsWithFilter = Not(StringEndsWith("left", "right")) +val stringStartsWithFilter = StringStartsWith("left", "right") +val notStringStartsWithFilter = Not(StringStartsWith("left", "right")) Assert.assertEquals("`left` = 5", Utils.compileFilter(equalFilter, dialect, inValueLengthLimit).get) +Assert.assertEquals("`left` != 5"
(doris-spark-connector) branch master updated: [bug] fix bitmap and hll type column access issue (#218)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new 5a3177b [bug] fix bitmap and hll type column access issue (#218) 5a3177b is described below commit 5a3177be4b49afd5bb5f8f54a1344ad903c03853 Author: gnehil AuthorDate: Mon Jul 29 15:54:23 2024 +0800 [bug] fix bitmap and hll type column access issue (#218) --- .../doris/spark/cfg/ConfigurationOptions.java | 7 -- .../org/apache/doris/spark/rest/RestService.java | 106 +++-- .../apache/doris/spark/serialization/RowBatch.java | 17 ++-- .../apache/doris/spark/rdd/ScalaValueReader.scala | 2 +- .../org/apache/doris/spark/sql/DorisRelation.scala | 5 + .../org/apache/doris/spark/sql/SchemaUtils.scala | 62 .../apache/doris/spark/sql/TestSchemaUtils.scala | 36 +++ 7 files changed, 125 insertions(+), 110 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index a0fea83..61d4563 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -93,13 +93,6 @@ public interface ConfigurationOptions { int DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT = 50; -/** - * set types to ignore, split by comma - * e.g. - * "doris.ignore-type"="bitmap,hll" - */ -String DORIS_IGNORE_TYPE = "doris.ignore-type"; - String DORIS_SINK_ENABLE_2PC = "doris.sink.enable-2pc"; boolean DORIS_SINK_ENABLE_2PC_DEFAULT = false; diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java index 412b6a8..3f3516f 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java @@ -17,56 +17,21 @@ package org.apache.doris.spark.rest; +import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_BENODES; import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_FENODES; -import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_FILTER_QUERY; -import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_READ_FIELD; import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD; import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_REQUEST_AUTH_USER; import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE; import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT; import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_MIN; import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLE_IDENTIFIER; -import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_BENODES; -import static org.apache.doris.spark.util.ErrorMessages.CONNECT_FAILED_MESSAGE; import static org.apache.doris.spark.util.ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE; import static org.apache.doris.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE; import static org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.PrintWriter; -import java.io.Serializable; -import java.net.HttpURLConnection; -import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.util.Map; -import java.util.HashMap; -import java.util.Base64; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.ArrayList; -import java.util.Set; -import java.util.HashSet; -import java.util.function.BiFunction; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.databind.JsonMappingException; -import com.fasterxml.jackson.databind.json.JsonMapper; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.doris.spark.cfg.ConfigurationOptions; import org.apache.doris.spark.cfg.Settings; import org.apache.doris.spark.cfg.SparkSettings; -import org.apache.doris.spark.exception.ConnectedFailedException; import org.apache.doris
(doris-flink-connector) branch master updated: [fix][mongodb-cdc]fix replace decimal type error when meeting non decimal type (#448)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 73636562 [fix][mongodb-cdc]fix replace decimal type error when meeting non decimal type (#448) 73636562 is described below commit 73636562f1ce21211b2e6c3e678d4481d0eff4de Author: North Lin <37775475+qg-...@users.noreply.github.com> AuthorDate: Fri Jul 26 16:12:40 2024 +0800 [fix][mongodb-cdc]fix replace decimal type error when meeting non decimal type (#448) --- .../flink/tools/cdc/mongodb/MongoDBSchema.java | 32 -- .../flink/tools/cdc/mongodb/MongoDBSchemaTest.java | 11 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java index 41752c5e..08ec4509 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java @@ -22,14 +22,18 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.doris.flink.catalog.doris.DorisType; import org.apache.doris.flink.catalog.doris.FieldSchema; +import org.apache.doris.flink.exception.DorisRuntimeException; import org.apache.doris.flink.tools.cdc.SourceSchema; import org.bson.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.Map; public class MongoDBSchema extends SourceSchema { +private static final Logger LOG = LoggerFactory.getLogger(MongoDBSchema.class); public MongoDBSchema( ArrayList sampleData, @@ -73,17 +77,27 @@ public class MongoDBSchema extends SourceSchema { int existingPrecision = existingPrecisionAndScale.f0; int existingScale = existingPrecisionAndScale.f1; -Tuple2 currentPrecisionAndScale = -MongoDBType.getDecimalPrecisionAndScale(newDorisType); -int currentPrecision = currentPrecisionAndScale.f0; -int currentScale = currentPrecisionAndScale.f1; +try { +Tuple2 currentPrecisionAndScale = +MongoDBType.getDecimalPrecisionAndScale(newDorisType); +int currentPrecision = currentPrecisionAndScale.f0; +int currentScale = currentPrecisionAndScale.f1; -int newScale = Math.max(existingScale, currentScale); -int newIntegerPartSize = -Math.max(existingPrecision - existingScale, currentPrecision - currentScale); -int newPrecision = newIntegerPartSize + newScale; +int newScale = Math.max(existingScale, currentScale); +int newIntegerPartSize = +Math.max( +existingPrecision - existingScale, currentPrecision - currentScale); +int newPrecision = newIntegerPartSize + newScale; -return DorisType.DECIMAL + "(" + newPrecision + "," + newScale + ")"; +return DorisType.DECIMAL + "(" + newPrecision + "," + newScale + ")"; +} catch (DorisRuntimeException e) { +LOG.warn( +"Replace decimal type of field:{} failed, the newly type is:{}, rollback to existing type:{}", +fieldName, +newDorisType, +existingField.getTypeString()); +return existingField.getTypeString(); +} } return newDorisType; } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java index 57f7f470..75dadde8 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java @@ -44,4 +44,15 @@ public class MongoDBSchemaTest { String d = mongoDBSchema.replaceDecimalTypeIfNeeded("fields1", "DECIMALV3(12,8)"); assertEquals("DECIMAL(15,8)", d); } + +@Test +public void replaceDecimalTypeIfNeededWhenContainsNonDecimalType() throws Exception { +ArrayList documents = new ArrayList<>(); +documents.add(new Document("fields1", 1234567.66)); +documents.add(new Document("fields1", 1234567)); +d
(doris-flink-connector) branch master updated: [improve]cdc tools parameter removes leading and trailing spaces (#447)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 8eb2f136 [improve]cdc tools parameter removes leading and trailing spaces (#447) 8eb2f136 is described below commit 8eb2f136263b9975af857077665be3eed5fc28e9 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Fri Jul 26 16:10:56 2024 +0800 [improve]cdc tools parameter removes leading and trailing spaces (#447) --- .../org/apache/doris/flink/tools/cdc/CdcTools.java | 35 +- .../apache/doris/flink/tools/cdc/CdcToolsTest.java | 74 -- 2 files changed, 85 insertions(+), 24 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java index b62f0f52..c91a0734 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java @@ -46,24 +46,25 @@ public class CdcTools { System.out.println("Input args: " + Arrays.asList(args) + ".\n"); String operation = args[0].toLowerCase(); String[] opArgs = Arrays.copyOfRange(args, 1, args.length); +MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); switch (operation) { case DatabaseSyncConfig.MYSQL_SYNC_DATABASE: -createMySQLSyncDatabase(opArgs); +createMySQLSyncDatabase(params); break; case DatabaseSyncConfig.ORACLE_SYNC_DATABASE: -createOracleSyncDatabase(opArgs); +createOracleSyncDatabase(params); break; case DatabaseSyncConfig.POSTGRES_SYNC_DATABASE: -createPostgresSyncDatabase(opArgs); +createPostgresSyncDatabase(params); break; case DatabaseSyncConfig.SQLSERVER_SYNC_DATABASE: -createSqlServerSyncDatabase(opArgs); +createSqlServerSyncDatabase(params); break; case DatabaseSyncConfig.MONGODB_SYNC_DATABASE: -createMongoDBSyncDatabase(opArgs); +createMongoDBSyncDatabase(params); break; case DatabaseSyncConfig.DB2_SYNC_DATABASE: -createDb2SyncDatabase(opArgs); +createDb2SyncDatabase(params); break; default: System.out.println("Unknown operation " + operation); @@ -71,8 +72,7 @@ public class CdcTools { } } -private static void createMySQLSyncDatabase(String[] opArgs) throws Exception { -MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); +private static void createMySQLSyncDatabase(MultipleParameterTool params) throws Exception { Preconditions.checkArgument(params.has(DatabaseSyncConfig.MYSQL_CONF)); Map mysqlMap = getConfigMap(params, DatabaseSyncConfig.MYSQL_CONF); Configuration mysqlConfig = Configuration.fromMap(mysqlMap); @@ -80,8 +80,7 @@ public class CdcTools { syncDatabase(params, databaseSync, mysqlConfig, SourceConnector.MYSQL); } -private static void createOracleSyncDatabase(String[] opArgs) throws Exception { -MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); +private static void createOracleSyncDatabase(MultipleParameterTool params) throws Exception { Preconditions.checkArgument(params.has(DatabaseSyncConfig.ORACLE_CONF)); Map oracleMap = getConfigMap(params, DatabaseSyncConfig.ORACLE_CONF); Configuration oracleConfig = Configuration.fromMap(oracleMap); @@ -89,8 +88,7 @@ public class CdcTools { syncDatabase(params, databaseSync, oracleConfig, SourceConnector.ORACLE); } -private static void createPostgresSyncDatabase(String[] opArgs) throws Exception { -MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); +private static void createPostgresSyncDatabase(MultipleParameterTool params) throws Exception { Preconditions.checkArgument(params.has(DatabaseSyncConfig.POSTGRES_CONF)); Map postgresMap = getConfigMap(params, DatabaseSyncConfig.POSTGRES_CONF); Configuration postgresConfig = Configuration.fromMap(postgresMap); @@ -98,8 +96,7 @@ public class CdcTools { syncDatabase(params, databaseSync, postgresConfig, SourceConnector.POSTGRES); } -private static void createSqlServerSyncDatabase(String[] opArgs) throws Exception { -MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); +private static void createSqlServ
(doris-flink-connector) branch master updated: [Feature](cdc) add DB2 database sync (#316)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new f559ea2c [Feature](cdc) add DB2 database sync (#316) f559ea2c is described below commit f559ea2cda6d367260b73a209898aa4af08bf6de Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Fri Jul 26 10:44:26 2024 +0800 [Feature](cdc) add DB2 database sync (#316) --- flink-doris-connector/pom.xml | 12 ++ .../jsondebezium/JsonDebeziumChangeUtils.java | 4 + .../org/apache/doris/flink/tools/cdc/CdcTools.java | 13 ++ .../doris/flink/tools/cdc/DatabaseSyncConfig.java | 2 + .../doris/flink/tools/cdc/SourceConnector.java | 3 +- .../doris/flink/tools/cdc/db2/Db2DatabaseSync.java | 227 + .../flink/tools/cdc/db2/Db2DateConverter.java | 133 .../{SourceConnector.java => db2/Db2Schema.java} | 32 +-- .../apache/doris/flink/tools/cdc/db2/Db2Type.java | 94 + .../flink/tools/cdc/CdcDb2SyncDatabaseCase.java| 104 ++ .../doris/flink/tools/cdc/db2/Db2TypeTest.java | 49 + 11 files changed, 660 insertions(+), 13 deletions(-) diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index b6e90bea..2d7b2875 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -286,6 +286,18 @@ under the License. + +org.apache.flink +flink-sql-connector-db2-cdc +${flink.sql.cdc.version} +provided + + +flink-shaded-guava +org.apache.flink + + + org.apache.flink flink-sql-connector-mongodb-cdc diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java index 571bacfb..492a7d29 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.node.NullNode; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.tools.cdc.SourceConnector; import org.apache.doris.flink.tools.cdc.SourceSchema; +import org.apache.doris.flink.tools.cdc.db2.Db2Type; import org.apache.doris.flink.tools.cdc.mysql.MysqlType; import org.apache.doris.flink.tools.cdc.oracle.OracleType; import org.apache.doris.flink.tools.cdc.postgres.PostgresType; @@ -84,6 +85,9 @@ public class JsonDebeziumChangeUtils { case SQLSERVER: dorisTypeName = SqlServerType.toDorisType(dataType, length, scale); break; +case DB2: +dorisTypeName = Db2Type.toDorisType(dataType, length, scale); +break; default: String errMsg = sourceConnector + " not support " + dataType + " schema change."; throw new UnsupportedOperationException(errMsg); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java index 3ab38a19..b62f0f52 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java @@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; +import org.apache.doris.flink.tools.cdc.db2.Db2DatabaseSync; import org.apache.doris.flink.tools.cdc.mongodb.MongoDBDatabaseSync; import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync; import org.apache.doris.flink.tools.cdc.oracle.OracleDatabaseSync; @@ -61,6 +62,9 @@ public class CdcTools { case DatabaseSyncConfig.MONGODB_SYNC_DATABASE: createMongoDBSyncDatabase(opArgs); break; +case DatabaseSyncConfig.DB2_SYNC_DATABASE: +createDb2SyncDatabase(opArgs); +break; default: System.out.println("Unknown operation " + operation); System.exit(1); @@ -112,6 +116,15 @@ public class CdcTools { syncDatabase(params, databaseSync, mongoConfig, SourceConnector.MONGODB); } +private static void create
(doris-flink-connector) branch master updated: [improve]Improve license ignored file paths (#436)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 22e5018c [improve]Improve license ignored file paths (#436) 22e5018c is described below commit 22e5018c4befa9bdd220da94557d2cb17c6505a8 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Thu Jul 25 16:58:01 2024 +0800 [improve]Improve license ignored file paths (#436) --- .licenserc.yaml | 26 +++--- 1 file changed, 3 insertions(+), 23 deletions(-) diff --git a/.licenserc.yaml b/.licenserc.yaml index c1f81090..60488398 100644 --- a/.licenserc.yaml +++ b/.licenserc.yaml @@ -5,32 +5,12 @@ header: paths-ignore: - 'dist' -- 'licenses' -- '**/*.md' -- 'LICENSE' +- 'LICENSE.txt' +- 'NOTICE.txt' - 'NOTICE' -- 'DISCLAIMER' -- '.clang-format' -- '.clang-format-ignore' -- '.gitattributes' - '.gitignore' -- '.gitmodules' +- '.github/PULL_REQUEST_TEMPLATE.md' - '.licenserc.yaml' -- '.rat-excludes' -- 'be/src/common/status.cpp' -- 'be/src/common/status.h' -- 'be/src/env/env.h' -- 'be/src/env/env_posix.cpp' -- '**/glibc-compatibility/**' -- '**/gutil/**' -- '**/test_data/**' -- '**/jmockit/**' -- '**/*.json' -- '**/*.dat' -- '**/*.svg' -- '**/*.md5' -- '**/*.patch' -- '**/*.log' - 'custom_env.sh.tpl' comment: on-failure - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch master updated: [testcase](cdc) add e2e test for MySql to Doris (#445)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 3433da6e [testcase](cdc) add e2e test for MySql to Doris (#445) 3433da6e is described below commit 3433da6e06a2320fb43a43834c6caa87234531a4 Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Thu Jul 25 16:57:31 2024 +0800 [testcase](cdc) add e2e test for MySql to Doris (#445) --- .../doris/flink/tools/cdc/MySQLDorisE2ECase.java | 54 ++ 1 file changed, 44 insertions(+), 10 deletions(-) diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java index ab1dfe77..6a613841 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java @@ -62,6 +62,7 @@ public class MySQLDorisE2ECase extends DorisTestBase { private static final String TABLE_2 = "tbl2"; private static final String TABLE_3 = "tbl3"; private static final String TABLE_4 = "tbl4"; +private static final String TABLE_5 = "tbl5"; private static final String TABLE_SQL_PARSE = "tbl_sql_parse"; private static final MySQLContainer MYSQL_CONTAINER = @@ -93,10 +94,13 @@ public class MySQLDorisE2ECase extends DorisTestBase { JobClient jobClient = submitJob(); // wait 2 times checkpoint Thread.sleep(2); -List expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3"); +List expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5"); String sql = -"select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1"; -String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3); +"select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s union all select * from %s.%s) res order by 1"; +String query1 = +String.format( +sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3, DATABASE, +TABLE_5); checkResult(expected, query1, 2); // add incremental data @@ -306,10 +310,13 @@ public class MySQLDorisE2ECase extends DorisTestBase { // wait 2 times checkpoint Thread.sleep(2); -List expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3"); +List expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5"); String sql = -"select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1"; -String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3); +"select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s union all select * from %s.%s) res order by 1"; +String query1 = +String.format( +sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3, DATABASE, +TABLE_5); checkResult(expected, query1, 2); // add incremental data @@ -431,7 +438,7 @@ public class MySQLDorisE2ECase extends DorisTestBase { Map tableConfig = new HashMap<>(); tableConfig.put("replication_num", "1"); -String includingTables = "tbl1|tbl2|tbl3"; +String includingTables = "tbl1|tbl2|tbl3|tbl5"; String excludingTables = ""; DatabaseSync databaseSync = new MysqlDatabaseSync(); databaseSync @@ -457,10 +464,13 @@ public class MySQLDorisE2ECase extends DorisTestBase { // wait 2 times checkpoint Thread.sleep(2); -List expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3"); +List expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5"); String sql = -"select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1"; -String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3); +"select * from ( sel
(doris-flink-connector) branch master updated (3433da6e -> a547ad2a)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git from 3433da6e [testcase](cdc) add e2e test for MySql to Doris (#445) add a547ad2a [improve]Improve the hard code in CDC whole database synchronization (#443) No new revisions were added by this update. Summary of changes: .../org/apache/doris/flink/tools/cdc/CdcTools.java | 88 ++-- .../doris/flink/tools/cdc/DatabaseSyncConfig.java | 97 ++ .../doris/flink/tools/cdc/SourceConnector.java | 3 +- .../tools/cdc/mongodb/MongoDBDatabaseSync.java | 13 ++- .../tools/cdc/mongodb/MongoDateConverter.java | 7 +- .../tools/cdc/mysql/DateToStringConverter.java | 47 +++ .../flink/tools/cdc/mysql/MysqlDatabaseSync.java | 19 +++-- .../flink/tools/cdc/oracle/OracleDatabaseSync.java | 16 ++-- .../tools/cdc/oracle/OracleDateConverter.java | 9 +- .../tools/cdc/postgres/PostgresDatabaseSync.java | 16 ++-- .../tools/cdc/postgres/PostgresDateConverter.java | 25 -- .../tools/cdc/sqlserver/SqlServerDatabaseSync.java | 16 ++-- .../cdc/sqlserver/SqlServerDateConverter.java | 27 +++--- .../flink/tools/cdc/CdcMongoSyncDatabaseCase.java | 49 +-- .../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java | 30 +++ .../tools/cdc/CdcOraclelSyncDatabaseCase.java | 37 - .../tools/cdc/CdcPostgresSyncDatabaseCase.java | 41 - .../tools/cdc/CdcSqlServerSyncDatabaseCase.java| 37 - 18 files changed, 345 insertions(+), 232 deletions(-) create mode 100644 flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch master updated: [fix]Fix NPE that occurs when schemaChangeMode is not specified (#444)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 0fa19486 [fix]Fix NPE that occurs when schemaChangeMode is not specified (#444) 0fa19486 is described below commit 0fa19486bdef44c302de3a4fcaa7ed9e3224c433 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Thu Jul 25 16:43:06 2024 +0800 [fix]Fix NPE that occurs when schemaChangeMode is not specified (#444) --- .../src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java | 5 + 1 file changed, 5 insertions(+) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 4abe8c61..1e66dd4c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -37,6 +37,7 @@ import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.DorisSystemException; import org.apache.doris.flink.sink.DorisSink; +import org.apache.doris.flink.sink.schema.SchemaChangeMode; import org.apache.doris.flink.sink.writer.WriteMode; import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer; import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer; @@ -550,6 +551,10 @@ public abstract class DatabaseSync { } public DatabaseSync setSchemaChangeMode(String schemaChangeMode) { +if (org.apache.commons.lang3.StringUtils.isEmpty(schemaChangeMode)) { +this.schemaChangeMode = SchemaChangeMode.DEBEZIUM_STRUCTURE.getName(); +return this; +} this.schemaChangeMode = schemaChangeMode.trim(); return this; } - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch master updated (67ca9401 -> 102a13c8)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git from 67ca9401 [Improve] Update flink cdc version to 3.1.1 (#438) add 102a13c8 [improve]Improve the entry point for creating tableSchema (#439) No new revisions were added by this update. Summary of changes: .../apache/doris/flink/catalog/DorisCatalog.java | 19 ++- .../flink/catalog/doris/DorisSchemaFactory.java| 128 +++ .../flink/sink/schema/SQLParserSchemaManager.java | 33 ++--- .../jsondebezium/JsonDebeziumChangeUtils.java | 42 --- .../JsonDebeziumSchemaChangeImplV2.java| 32 ++--- .../jsondebezium/SQLParserSchemaChange.java| 6 +- .../apache/doris/flink/tools/cdc/DatabaseSync.java | 39 ++ .../apache/doris/flink/tools/cdc/SourceSchema.java | 1 + .../catalog/doris/DorisSchemaFactoryTest.java | 140 + .../sink/schema/SQLParserSchemaManagerTest.java| 10 +- .../jsondebezium/TestJsonDebeziumChangeUtils.java | 43 --- .../jsondebezium/TestSQLParserSchemaChange.java| 18 +-- 12 files changed, 324 insertions(+), 187 deletions(-) create mode 100644 flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java create mode 100644 flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactoryTest.java delete mode 100644 flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumChangeUtils.java - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch branch-for-flinkcdc2.x created (now 07afbcd7)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch branch-for-flinkcdc2.x in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git at 07afbcd7 [typo] A simple comment typo (#440) No new revisions were added by this update. - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch before-for-flinkcdc-3.1 created (now 07afbcd7)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch before-for-flinkcdc-3.1 in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git at 07afbcd7 [typo] A simple comment typo (#440) No new revisions were added by this update. - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch master updated: [Improve] Update flink cdc version to 3.1.1 (#438)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 67ca9401 [Improve] Update flink cdc version to 3.1.1 (#438) 67ca9401 is described below commit 67ca9401ea8e991cd09cd98c9237c06c93305790 Author: wudi <676366...@qq.com> AuthorDate: Wed Jul 24 10:10:23 2024 +0800 [Improve] Update flink cdc version to 3.1.1 (#438) --- flink-doris-connector/pom.xml | 28 +++ .../DorisJsonDebeziumDeserializationSchema.java| 12 +++ .../tools/cdc/mongodb/MongoDBDatabaseSync.java | 16 - .../tools/cdc/mysql/DateToStringConverter.java | 3 +- .../flink/tools/cdc/mysql/MysqlDatabaseSync.java | 20 +-- .../flink/tools/cdc/oracle/OracleDatabaseSync.java | 34 +- .../tools/cdc/oracle/OracleDateConverter.java | 3 +- .../tools/cdc/postgres/PostgresDatabaseSync.java | 42 +++--- .../tools/cdc/postgres/PostgresDateConverter.java | 3 +- .../tools/cdc/sqlserver/SqlServerDatabaseSync.java | 38 ++-- .../cdc/sqlserver/SqlServerDateConverter.java | 3 +- .../apache/doris/flink/CDCSchemaChangeExample.java | 6 ++-- .../doris/flink/utils/DateToStringConverter.java | 3 +- 13 files changed, 116 insertions(+), 95 deletions(-) diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index 052180c4..b6e90bea 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -70,7 +70,7 @@ under the License. 1.6.2-SNAPSHOT 1.18.0 1.18 -2.4.2 +3.1.1 flink-python 0.16.0 13.0.0 @@ -93,6 +93,8 @@ under the License. 4.12 1.3 4.9 +8.0.26 +19.3.0.0 @@ -237,7 +239,7 @@ under the License. -com.ververica +org.apache.flink flink-sql-connector-mysql-cdc ${flink.sql.cdc.version} provided @@ -249,7 +251,7 @@ under the License. -com.ververica +org.apache.flink flink-sql-connector-oracle-cdc ${flink.sql.cdc.version} provided @@ -261,7 +263,7 @@ under the License. -com.ververica +org.apache.flink flink-sql-connector-postgres-cdc ${flink.sql.cdc.version} provided @@ -273,7 +275,7 @@ under the License. -com.ververica +org.apache.flink flink-sql-connector-sqlserver-cdc ${flink.sql.cdc.version} provided @@ -285,7 +287,7 @@ under the License. -com.ververica +org.apache.flink flink-sql-connector-mongodb-cdc ${flink.sql.cdc.version} @@ -297,6 +299,20 @@ under the License. + + +mysql +mysql-connector-java +${mysql.driver.version} +test + + +com.oracle.ojdbc +ojdbc8 +${ojdbc.version} +provided + + org.apache.flink flink-runtime-web diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/deserialize/DorisJsonDebeziumDeserializationSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/deserialize/DorisJsonDebeziumDeserializationSchema.java index d1d91545..b7e4575c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/deserialize/DorisJsonDebeziumDeserializationSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/deserialize/DorisJsonDebeziumDeserializationSchema.java @@ -19,6 +19,12 @@ package org.apache.doris.flink.tools.cdc.deserialize; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema; +import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Field; +import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema; +import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct; +import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord; +import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; import org.apache.flink.util.Collector; import com.fasterxml.jackson.databind.JsonNode; @@ -26,12 +32,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
(doris-flink-connector) branch branch-for-flinkcdc2.x deleted (was 07afbcd7)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch branch-for-flinkcdc2.x in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git was 07afbcd7 [typo] A simple comment typo (#440) The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository. - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch branch-for-flinkcdc-2.x created (now 07afbcd7)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch branch-for-flinkcdc-2.x in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git at 07afbcd7 [typo] A simple comment typo (#440) No new revisions were added by this update. - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch before-for-flinkcdc-3.1 deleted (was 07afbcd7)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch before-for-flinkcdc-3.1 in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git was 07afbcd7 [typo] A simple comment typo (#440) The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository. - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch master updated: [typo] A simple comment typo (#440)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 07afbcd7 [typo] A simple comment typo (#440) 07afbcd7 is described below commit 07afbcd7b1e8f8fb7238cf21326317c9cbf4d3c1 Author: JasonLee <40521353+jasonleecod...@users.noreply.github.com> AuthorDate: Wed Jul 24 10:05:45 2024 +0800 [typo] A simple comment typo (#440) --- .../java/org/apache/doris/flink/table/DorisDynamicTableFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index 5edefe6b..e8ac4dbf 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -281,7 +281,7 @@ public final class DorisDynamicTableFactory DorisConfigOptions.getStreamLoadProp(context.getCatalogTable().getOptions()); TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); -// create and return dynamic table source +// create and return dynamic table sink return new DorisDynamicTableSink( getDorisOptions(helper.getOptions()), getDorisReadOptions(helper.getOptions()), - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch master updated: [improve]The sql_parser model of schema change support automatic table creation (#435)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new db76d2a3 [improve]The sql_parser model of schema change support automatic table creation (#435) db76d2a3 is described below commit db76d2a38d73d5c3f3dc2912aed4b270467fb074 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Tue Jul 23 11:30:54 2024 +0800 [improve]The sql_parser model of schema change support automatic table creation (#435) --- .../doris/flink/catalog/doris/FieldSchema.java | 18 +++ .../doris/flink/catalog/doris/TableSchema.java | 27 .../flink/sink/schema/SQLParserSchemaManager.java | 155 +++-- .../jsondebezium/JsonDebeziumChangeUtils.java | 37 + .../jsondebezium/JsonDebeziumSchemaChange.java | 8 ++ .../JsonDebeziumSchemaChangeImplV2.java| 48 +-- .../jsondebezium/SQLParserSchemaChange.java| 32 - .../sink/schema/SQLParserSchemaManagerTest.java| 138 ++ .../jsondebezium/TestJsonDebeziumChangeUtils.java | 43 ++ .../TestJsonDebeziumSchemaChangeImplV2.java| 13 -- .../jsondebezium/TestSQLParserSchemaChange.java| 42 +- .../doris/flink/tools/cdc/MySQLDorisE2ECase.java | 154 12 files changed, 641 insertions(+), 74 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java index 3d7f2765..a8d85e1f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java @@ -69,4 +69,22 @@ public class FieldSchema { public void setDefaultValue(String defaultValue) { this.defaultValue = defaultValue; } + +@Override +public String toString() { +return "FieldSchema{" ++ "name='" ++ name ++ '\'' ++ ", typeString='" ++ typeString ++ '\'' ++ ", defaultValue='" ++ defaultValue ++ '\'' ++ ", comment='" ++ comment ++ '\'' ++ '}'; +} } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java index 4cc9098f..3a47a044 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java @@ -106,4 +106,31 @@ public class TableSchema { public Integer getTableBuckets() { return tableBuckets; } + +@Override +public String toString() { +return "TableSchema{" ++ "database='" ++ database ++ '\'' ++ ", table='" ++ table ++ '\'' ++ ", tableComment='" ++ tableComment ++ '\'' ++ ", fields=" ++ fields ++ ", keys=" ++ String.join(",", keys) ++ ", model=" ++ model.name() ++ ", distributeKeys=" ++ String.join(",", distributeKeys) ++ ", properties=" ++ properties ++ ", tableBuckets=" ++ tableBuckets ++ '}'; +} } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java index 6f157cdc..5acedfc2 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java @@ -18,6 +18,7 @@ package org.apache.doris.flink.sink.schema; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.Preconditions; import net.sf.jsqlparser.JSQLParserException; import net.sf.jsqlparser.parser.CCJSqlParserUtil; @@ -27,22 +28,32 @@ import net.sf.jsqlparser.statement.alter.AlterExpression; import net.sf.jsqlparser.statement.alter.AlterExpression.ColumnDataType; import net.sf.jsqlparser.statement.alter.AlterOperation; i
(doris-flink-connector) branch master updated: [improve] optimize the batch stream load redirect logic (#437)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new a57f24b6 [improve] optimize the batch stream load redirect logic (#437) a57f24b6 is described below commit a57f24b67cc8957851e293761a0bd3407e631339 Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Tue Jul 23 11:28:38 2024 +0800 [improve] optimize the batch stream load redirect logic (#437) --- .../doris/flink/sink/batch/DorisBatchStreamLoad.java | 8 +--- .../apache/doris/flink/sink/batch/DorisBatchWriter.java| 8 +++- .../doris/flink/sink/batch/TestDorisBatchStreamLoad.java | 14 +++--- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index c5614c31..2dd7a50e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -102,12 +102,14 @@ public class DorisBatchStreamLoad implements Serializable { private BackendUtil backendUtil; private boolean enableGroupCommit; private boolean enableGzCompress; +private int subTaskId; public DorisBatchStreamLoad( DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, DorisExecutionOptions executionOptions, -LabelGenerator labelGenerator) { +LabelGenerator labelGenerator, +int subTaskId) { this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ? new BackendUtil(dorisOptions.getBenodes()) @@ -154,6 +156,7 @@ public class DorisBatchStreamLoad implements Serializable { new ThreadPoolExecutor.AbortPolicy()); this.started = new AtomicBoolean(true); this.loadExecutorService.execute(loadAsyncExecutor); +this.subTaskId = subTaskId; } /** @@ -293,7 +296,6 @@ public class DorisBatchStreamLoad implements Serializable { if (enableGzCompress) { putBuilder.setEntity(new GzipCompressingEntity(entity)); } - Throwable resEx = new Throwable(); int retry = 0; while (retry <= executionOptions.getMaxRetries()) { @@ -351,7 +353,7 @@ public class DorisBatchStreamLoad implements Serializable { } private void refreshLoadUrl(String database, String table) { -hostPort = backendUtil.getAvailableBackend(); +hostPort = backendUtil.getAvailableBackend(subTaskId); loadUrl = String.format(LOAD_URL_PATTERN, hostPort, database, table); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java index 4b48436c..6fbde55d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java @@ -59,6 +59,7 @@ public class DorisBatchWriter private transient volatile Exception flushException = null; private String database; private String table; +private int subtaskId; public DorisBatchWriter( Sink.InitContext initContext, @@ -75,6 +76,7 @@ public class DorisBatchWriter this.table = tableInfo[1]; } LOG.info("labelPrefix " + executionOptions.getLabelPrefix()); +this.subtaskId = initContext.getSubtaskId(); this.labelPrefix = executionOptions.getLabelPrefix() + "_" + initContext.getSubtaskId(); this.labelGenerator = new LabelGenerator(labelPrefix, false); this.scheduledExecutorService = @@ -92,7 +94,11 @@ public class DorisBatchWriter public void initializeLoad() { this.batchStreamLoad = new DorisBatchStreamLoad( -dorisOptions, dorisReadOptions, executionOptions, labelGenerator); +dorisOptions, +dorisReadOptions, +executionOptions, +labelGenerator, +subtaskId); // when uploading data in streaming mode, we need to regularly detect whether there are // exceptions. scheduledExecutorService.scheduleWithFixedDelay( diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java b/flink-dori
(doris-flink-connector) branch master updated: [Improve] support gz compress in streamload (#434)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new d65c0223 [Improve] support gz compress in streamload (#434) d65c0223 is described below commit d65c0223362dc841a1db25cc11b3c487db7c13e6 Author: wudi <676366...@qq.com> AuthorDate: Mon Jul 22 15:49:26 2024 +0800 [Improve] support gz compress in streamload (#434) --- .../flink/sink/batch/DorisBatchStreamLoad.java | 9 + .../doris/flink/sink/writer/DorisStreamLoad.java | 10 + .../doris/flink/sink/writer/LoadConstants.java | 2 + .../apache/doris/flink/sink/DorisSinkITCase.java | 43 ++ 4 files changed, 64 insertions(+) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index 375e4335..c5614c31 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -34,6 +34,7 @@ import org.apache.doris.flink.sink.EscapeHandler; import org.apache.doris.flink.sink.HttpPutBuilder; import org.apache.doris.flink.sink.HttpUtil; import org.apache.doris.flink.sink.writer.LabelGenerator; +import org.apache.http.client.entity.GzipCompressingEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.client.CloseableHttpClient; @@ -65,6 +66,8 @@ import java.util.concurrent.atomic.AtomicReference; import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT; import static org.apache.doris.flink.sink.LoadStatus.SUCCESS; import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW; +import static org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE; +import static org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE_GZ; import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT; @@ -98,6 +101,7 @@ public class DorisBatchStreamLoad implements Serializable { private HttpClientBuilder httpClientBuilder = new HttpUtil().getHttpClientBuilderForBatch(); private BackendUtil backendUtil; private boolean enableGroupCommit; +private boolean enableGzCompress; public DorisBatchStreamLoad( DorisOptions dorisOptions, @@ -128,6 +132,7 @@ public class DorisBatchStreamLoad implements Serializable { && !loadProps .getProperty(GROUP_COMMIT) .equalsIgnoreCase(GROUP_COMMIT_OFF_MODE); +this.enableGzCompress = loadProps.getProperty(COMPRESS_TYPE, "").equals(COMPRESS_TYPE_GZ); this.executionOptions = executionOptions; this.flushQueue = new LinkedBlockingDeque<>(executionOptions.getFlushQueueSize()); if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) { @@ -285,6 +290,10 @@ public class DorisBatchStreamLoad implements Serializable { .addHiddenColumns(executionOptions.getDeletable()) .addProperties(executionOptions.getStreamLoadProp()); +if (enableGzCompress) { +putBuilder.setEntity(new GzipCompressingEntity(entity)); +} + Throwable resEx = new Throwable(); int retry = 0; while (retry <= executionOptions.getMaxRetries()) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index 4cbcb431..060bccb5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -33,6 +33,7 @@ import org.apache.doris.flink.rest.models.RespContent; import org.apache.doris.flink.sink.EscapeHandler; import org.apache.doris.flink.sink.HttpPutBuilder; import org.apache.doris.flink.sink.ResponseUtil; +import org.apache.http.client.entity.GzipCompressingEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.entity.InputStreamEntity; import org.apache.http.impl.client.CloseableHttpClient; @@ -56,6 +57,8 @@ import static org.apache.doris.flink.sink.LoadStatus.LABEL_ALREADY_EXIST; import static org.apache.doris.flink.sink.LoadStatus.SUCCESS; import static org.apache.doris.flink.sink.ResponseUtil.LA
svn commit: r70459 - in /dev/doris/flink-connector/1.6.2.1: ./ apache-doris-flink-connector-1.6.2.1-src.tar.gz apache-doris-flink-connector-1.6.2.1-src.tar.gz.asc apache-doris-flink-connector-1.6.2.1-
Author: diwu Date: Mon Jul 22 02:58:03 2024 New Revision: 70459 Log: commit for flink connector 1.6.2.1 Added: dev/doris/flink-connector/1.6.2.1/ dev/doris/flink-connector/1.6.2.1/apache-doris-flink-connector-1.6.2.1-src.tar.gz (with props) dev/doris/flink-connector/1.6.2.1/apache-doris-flink-connector-1.6.2.1-src.tar.gz.asc (with props) dev/doris/flink-connector/1.6.2.1/apache-doris-flink-connector-1.6.2.1-src.tar.gz.sha512 Added: dev/doris/flink-connector/1.6.2.1/apache-doris-flink-connector-1.6.2.1-src.tar.gz == Binary file - no diff available. Propchange: dev/doris/flink-connector/1.6.2.1/apache-doris-flink-connector-1.6.2.1-src.tar.gz -- svn:mime-type = application/x-gzip Added: dev/doris/flink-connector/1.6.2.1/apache-doris-flink-connector-1.6.2.1-src.tar.gz.asc == Binary file - no diff available. Propchange: dev/doris/flink-connector/1.6.2.1/apache-doris-flink-connector-1.6.2.1-src.tar.gz.asc -- svn:mime-type = application/pgp-signature Added: dev/doris/flink-connector/1.6.2.1/apache-doris-flink-connector-1.6.2.1-src.tar.gz.sha512 == --- dev/doris/flink-connector/1.6.2.1/apache-doris-flink-connector-1.6.2.1-src.tar.gz.sha512 (added) +++ dev/doris/flink-connector/1.6.2.1/apache-doris-flink-connector-1.6.2.1-src.tar.gz.sha512 Mon Jul 22 02:58:03 2024 @@ -0,0 +1 @@ +184a6003772123309e85a0b64007b4fd2de98a97692931626176e7aa5238978a78545f007c3d309f50f1e591ddae5984eb841a4a81f7b36c21eea48f70b3d23a apache-doris-flink-connector-1.6.2.1-src.tar.gz - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) 01/01: Commit for release 1.6.2.1
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to tag 1.6.2.1 in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git commit b12cf6356e4ebb6973dee07e72c9434c6e4dbf9e Author: wudi <676366...@qq.com> AuthorDate: Mon Jul 22 10:39:08 2024 +0800 Commit for release 1.6.2.1 --- flink-doris-connector/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index b5080fb0..cead1269 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -67,7 +67,7 @@ under the License. -1.6.2-SNAPSHOT +1.6.2.1 1.18.0 1.18 2.4.2 - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) tag 1.6.2.1 created (now b12cf635)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to tag 1.6.2.1 in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git at b12cf635 (commit) This tag includes the following new commits: new b12cf635 Commit for release 1.6.2.1 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch release-1.6.2.1 created (now fe88af97)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch release-1.6.2.1 in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git at fe88af97 [Fix] Fix the problem that the existing tables in Doris cannot be synchronized.(#425) No new revisions were added by this update. - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch 1.6.2.1 deleted (was fe88af97)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch 1.6.2.1 in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git was fe88af97 [Fix] Fix the problem that the existing tables in Doris cannot be synchronized.(#425) The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository. - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch 1.6.2.1 created (now fe88af97)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch 1.6.2.1 in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git at fe88af97 [Fix] Fix the problem that the existing tables in Doris cannot be synchronized.(#425) No new revisions were added by this update. - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch master updated: [fix]Fix the error of missing content returned by schema change response (#433)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 8ba89c44 [fix]Fix the error of missing content returned by schema change response (#433) 8ba89c44 is described below commit 8ba89c4488bb7b639566b519da58c698f1e13b2f Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Wed Jul 17 17:22:44 2024 +0800 [fix]Fix the error of missing content returned by schema change response (#433) --- .../doris/flink/sink/schema/SchemaChangeManager.java | 20 +--- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java index 27f2aece..c946bee7 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java @@ -20,6 +20,7 @@ package org.apache.doris.flink.sink.schema; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.StringUtils; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.codec.binary.Base64; @@ -198,12 +199,12 @@ public class SchemaChangeManager implements Serializable { httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); httpGet.setEntity( new StringEntity(objectMapper.writeValueAsString(params), charsetEncoding)); -String responseEntity = ""; -Map responseMap = handleResponse(httpGet, responseEntity); -return handleSchemaChange(responseMap, responseEntity); +String responseEntity = handleResponse(httpGet); +return handleSchemaChange(responseEntity); } -private boolean handleSchemaChange(Map responseMap, String responseEntity) { +private boolean handleSchemaChange(String responseEntity) throws JsonProcessingException { +Map responseMap = objectMapper.readValue(responseEntity, Map.class); String code = responseMap.getOrDefault("code", "-1").toString(); if (code.equals("0")) { return true; @@ -221,9 +222,8 @@ public class SchemaChangeManager implements Serializable { } LOG.info("Execute SQL: {}", ddl); HttpPost httpPost = buildHttpPost(ddl, database); -String responseEntity = ""; -Map responseMap = handleResponse(httpPost, responseEntity); -return handleSchemaChange(responseMap, responseEntity); +String responseEntity = handleResponse(httpPost); +return handleSchemaChange(responseEntity); } public HttpPost buildHttpPost(String ddl, String database) @@ -245,15 +245,13 @@ public class SchemaChangeManager implements Serializable { return httpPost; } -private Map handleResponse(HttpUriRequest request, String responseEntity) { +private String handleResponse(HttpUriRequest request) { try (CloseableHttpClient httpclient = HttpClients.createDefault()) { CloseableHttpResponse response = httpclient.execute(request); final int statusCode = response.getStatusLine().getStatusCode(); final String reasonPhrase = response.getStatusLine().getReasonPhrase(); if (statusCode == 200 && response.getEntity() != null) { -responseEntity = EntityUtils.toString(response.getEntity()); -Map responseMap = objectMapper.readValue(responseEntity, Map.class); -return responseMap; +return EntityUtils.toString(response.getEntity()); } else { throw new DorisSchemaChangeException( "Failed to schemaChange, status: " - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch master updated: [Fix] unsupport when use doris catalog list views (#432)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new f92bc754 [Fix] unsupport when use doris catalog list views (#432) f92bc754 is described below commit f92bc7541448f1a946567f00bf06d915a2e65ad7 Author: wudi <676366...@qq.com> AuthorDate: Wed Jul 17 15:52:12 2024 +0800 [Fix] unsupport when use doris catalog list views (#432) --- .../src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java| 4 ++-- .../src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java | 2 ++ .../src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java | 2 ++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java index c56f64a0..a6e92158 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java @@ -202,7 +202,7 @@ public class DorisCatalog extends AbstractCatalog { @Override public List listViews(String databaseName) throws DatabaseNotExistException, CatalogException { -throw new UnsupportedOperationException(); +return Collections.emptyList(); } @Override @@ -425,7 +425,7 @@ public class DorisCatalog extends AbstractCatalog { @Override public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException { -throw new UnsupportedOperationException(); +return false; } @Override diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java index a8e3d7d6..bfca184e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java @@ -47,6 +47,7 @@ import org.apache.doris.flink.catalog.doris.DorisType; import static org.apache.doris.flink.catalog.doris.DorisType.ARRAY; import static org.apache.doris.flink.catalog.doris.DorisType.BIGINT; +import static org.apache.doris.flink.catalog.doris.DorisType.BIGINT_UNSIGNED; import static org.apache.doris.flink.catalog.doris.DorisType.BOOLEAN; import static org.apache.doris.flink.catalog.doris.DorisType.CHAR; import static org.apache.doris.flink.catalog.doris.DorisType.DATE; @@ -113,6 +114,7 @@ public class DorisTypeMapper { case VARCHAR: return DataTypes.VARCHAR(precision); case LARGEINT: +case BIGINT_UNSIGNED: case STRING: case JSONB: case JSON: diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java index b2b3776c..7bc9fd0b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java @@ -24,6 +24,8 @@ public class DorisType { public static final String INT = "INT"; public static final String BIGINT = "BIGINT"; public static final String LARGEINT = "LARGEINT"; +// largeint is bigint unsigned in information_schema.COLUMNS +public static final String BIGINT_UNSIGNED = "BIGINT UNSIGNED"; public static final String FLOAT = "FLOAT"; public static final String DOUBLE = "DOUBLE"; public static final String DECIMAL = "DECIMAL"; - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch master updated: [Improve]Schema change parses ddl sql using jsqlparser framework (#422)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 13f1fcde [Improve]Schema change parses ddl sql using jsqlparser framework (#422) 13f1fcde is described below commit 13f1fcdeea2b7cf89e500539b8d888accf24c909 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Wed Jul 17 15:10:15 2024 +0800 [Improve]Schema change parses ddl sql using jsqlparser framework (#422) --- flink-doris-connector/pom.xml | 6 + .../doris/flink/catalog/doris/DorisSystem.java | 3 + .../flink/sink/schema/SQLParserSchemaManager.java | 218 + .../flink/sink/schema/SchemaChangeHelper.java | 5 +- .../doris/flink/sink/schema/SchemaChangeMode.java | 33 .../serializer/JsonDebeziumSchemaSerializer.java | 43 +++- .../jsondebezium/JsonDebeziumChangeContext.java| 4 +- .../jsondebezium/JsonDebeziumChangeUtils.java | 33 .../jsondebezium/JsonDebeziumSchemaChange.java | 69 +++ .../jsondebezium/JsonDebeziumSchemaChangeImpl.java | 11 +- .../JsonDebeziumSchemaChangeImplV2.java| 102 +++--- .../jsondebezium/SQLParserSchemaChange.java| 93 + .../org/apache/doris/flink/tools/cdc/CdcTools.java | 2 + .../apache/doris/flink/tools/cdc/DatabaseSync.java | 7 + .../sink/schema/SQLParserSchemaManagerTest.java| 206 +++ .../jsondebezium/TestSQLParserSchemaChange.java| 141 + .../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java | 5 +- .../tools/cdc/CdcOraclelSyncDatabaseCase.java | 2 +- .../tools/cdc/CdcPostgresSyncDatabaseCase.java | 2 +- .../tools/cdc/CdcSqlServerSyncDatabaseCase.java| 2 +- 20 files changed, 895 insertions(+), 92 deletions(-) diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index b5080fb0..052180c4 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -92,6 +92,7 @@ under the License. 1.17.6 4.12 1.3 +4.9 @@ -354,6 +355,11 @@ under the License. ${flink.version} test + +com.github.jsqlparser +jsqlparser +${jsqlparser.version} + diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java index ab26e308..0d33eb9f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java @@ -285,6 +285,9 @@ public class DorisSystem implements Serializable { } public static String identifier(String name) { +if (name.startsWith("`") && name.endsWith("`")) { +return name; +} return "`" + name + "`"; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java new file mode 100644 index ..6f157cdc --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java @@ -0,0 +1,218 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.schema; + +import org.apache.flink.annotation.VisibleForTesting; + +import net.sf.jsqlparser.JSQLParserException; +import net.sf.jsqlparser.parser.CCJSqlParserUtil; +import net.sf.jsqlparser.statement.Statement; +import net.sf.jsqlparser.statement.alter.Alter; +import net.sf.jsqlparser.statement.alter.AlterExpression; +import net.sf.jsqlparser.statement.alter.AlterExpression.ColumnDataType; +import net.sf.jsqlparser.statement.alter.AlterOperation; +import net.sf.jsqlparser.statement.create
(doris) branch master updated: [fix](docker)Docker be register bug and add fe new interface recovery (#37335)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new e334e407975 [fix](docker)Docker be register bug and add fe new interface recovery (#37335) e334e407975 is described below commit e334e4079750461531c8237ebc79fcf2aa9e3c9d Author: FreeOnePlus <54164178+freeonep...@users.noreply.github.com> AuthorDate: Tue Jul 16 15:42:59 2024 +0800 [fix](docker)Docker be register bug and add fe new interface recovery (#37335) --- docker/runtime/be/resource/entry_point.sh | 2 +- docker/runtime/be/resource/init_be.sh | 68 +++-- docker/runtime/fe/resource/init_fe.sh | 238 +- 3 files changed, 255 insertions(+), 53 deletions(-) diff --git a/docker/runtime/be/resource/entry_point.sh b/docker/runtime/be/resource/entry_point.sh index 1ae418163fa..6e3dfaf3875 100755 --- a/docker/runtime/be/resource/entry_point.sh +++ b/docker/runtime/be/resource/entry_point.sh @@ -167,7 +167,7 @@ check_be_status() { if [[ $1 == true ]]; then docker_process_sql <<<"show frontends" | grep "[[:space:]]${MASTER_FE_IP}[[:space:]]" else - docker_process_sql <<<"show backends" | grep "[[:space:]]${CURRENT_BE_IP}[[:space:]]" | grep "[[:space:]]${CURRENT_BE_PORT}[[:space:]]" | grep "[[:space:]]true[[:space:]]" + docker_process_sql <<<"show backends" | grep "[[:space:]]${CURRENT_BE_IP}[[:space:]]" | grep "[[:space:]]${CURRENT_BE_PORT}[[:space:]]" fi be_join_status=$? if [[ "${be_join_status}" == 0 ]]; then diff --git a/docker/runtime/be/resource/init_be.sh b/docker/runtime/be/resource/init_be.sh index 42afd1f6754..f9269f3ee7e 100644 --- a/docker/runtime/be/resource/init_be.sh +++ b/docker/runtime/be/resource/init_be.sh @@ -72,23 +72,26 @@ show_be_args(){ doris_note "CURRENT_BE_IP " ${CURRENT_BE_IP} doris_note "CURRENT_BE_PORT " ${CURRENT_BE_PORT} doris_note "RUN_TYPE " ${RUN_TYPE} - doris_note "PRIORITY_NETWORKS " ${PRIORITY_NETWORKS} } # Execute sql script, passed via stdin # usage: docker_process_sql sql_script docker_process_sql() { - set +e - mysql -uroot -P9030 -h${MASTER_FE_IP} --comments "$@" 2>/dev/null +set +e +if [[ $RUN_TYPE == "ELECTION" || $RUN_TYPE == "ASSIGN" ]]; then +mysql -uroot -P9030 -h${MASTER_FE_IP} --comments "$@" 2>/dev/null +elif [[ $RUN_TYPE == "FQDN" ]]; then +mysql -uroot -P9030 -h${MASTER_NODE_NAME} --comments "$@" 2>/dev/null +fi } node_role_conf(){ - if [[ ${NODE_ROLE} == 'computation' ]]; then -doris_note "this node role is computation" -echo "be_node_role=computation" >>${DORIS_HOME}/be/conf/be.conf - else -doris_note "this node role is mix" - fi +if [[ ${NODE_ROLE} == 'computation' ]]; then +doris_note "this node role is computation" +echo "be_node_role=computation" >>${DORIS_HOME}/be/conf/be.conf +else +doris_note "this node role is mix" +fi } register_be_to_fe() { @@ -103,12 +106,25 @@ register_be_to_fe() { fi fi for i in {1..300}; do - docker_process_sql <<<"alter system add backend '${CURRENT_BE_IP}:${CURRENT_BE_PORT}'" + if [[ $RUN_TYPE == "ELECTION" || $RUN_TYPE == "ASSIGN" ]]; then + SQL="alter system add backend '${CURRENT_BE_IP}:${CURRENT_BE_PORT}';" + doris_note "Executing SQL: $SQL" + docker_process_sql <<<"$SQL" + elif [[ $RUN_TYPE == "FQDN" ]]; then + SQL="alter system add backend '${CURRENT_NODE_NAME}:${CURRENT_BE_PORT}';" + doris_note "Executing SQL: $SQL" + docker_process_sql <<<"$SQL" + fi register_be_status=$? if [[ $register_be_status == 0 ]]; then doris_note "BE successfully registered to FE!" is_fe_start=true return + else +check_be_status +if [[ $IS_BE_JOIN_STATUS == "true" ]]; then + return +fi fi if [[ $(( $i % 20 )) == 1 ]]; then doris_note "Register BE to FE is failed. retry." @@ -122,16 +138,23 @@ register_be_to_fe() { check_be_status() { set +e -local is_fe_start=false -for i in {1..300}; do +declare -g IS_FE_START_STATUS IS_BE_JOIN_STATUS +IS_FE_START_STATUS=false +IS_BE_JOIN_STATUS=false +for i in {1..100}; do if [[ $(($i % 20)) == 1 ]]; then doris_warn "start check be register stat
svn commit: r70325 - /dev/doris/kafka-connector/1.1.0-rc01/ /release/doris/kafka-connector/1.1.0-rc01/
Author: diwu Date: Tue Jul 16 02:14:48 2024 New Revision: 70325 Log: move doris kafka connector 1.1.0 to release Added: release/doris/kafka-connector/1.1.0-rc01/ - copied from r70324, dev/doris/kafka-connector/1.1.0-rc01/ Removed: dev/doris/kafka-connector/1.1.0-rc01/ - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
svn commit: r70324 - /dev/doris/flink-connector/1.6.2/ /release/doris/flink-connector/1.6.2/
Author: diwu Date: Tue Jul 16 02:14:37 2024 New Revision: 70324 Log: move doris flink connector 1.6.2 to release Added: release/doris/flink-connector/1.6.2/ - copied from r70323, dev/doris/flink-connector/1.6.2/ Removed: dev/doris/flink-connector/1.6.2/ - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch master updated: [Fix] Fix flink sql projection pushdown error (#428)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 38026fde [Fix] Fix flink sql projection pushdown error (#428) 38026fde is described below commit 38026fde2e1b010b0776df6d4e9cdbb2e49966b6 Author: wudi <676366...@qq.com> AuthorDate: Mon Jul 15 15:36:49 2024 +0800 [Fix] Fix flink sql projection pushdown error (#428) --- .../doris/flink/table/DorisDynamicTableSource.java | 22 - .../doris/flink/source/DorisSourceITCase.java | 36 ++ 2 files changed, 50 insertions(+), 8 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java index 9753361c..5827f879 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java @@ -94,6 +94,15 @@ public final class DorisDynamicTableSource String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND ")); readOptions.setFilterQuery(filterQuery); } +if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) { +String[] selectFields = +DataType.getFieldNames(physicalRowDataType).toArray(new String[0]); +readOptions.setReadFields( +Arrays.stream(selectFields) +.map(item -> String.format("`%s`", item.trim().replace("`", ""))) +.collect(Collectors.joining(", "))); +} + if (readOptions.getUseOldApi()) { List dorisPartitions; try { @@ -199,14 +208,11 @@ public final class DorisDynamicTableSource @Override public void applyProjection(int[][] projectedFields, DataType producedDataType) { this.physicalRowDataType = Projection.of(projectedFields).project(physicalRowDataType); -if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) { -String[] selectFields = -DataType.getFieldNames(physicalRowDataType).toArray(new String[0]); -this.readOptions.setReadFields( -Arrays.stream(selectFields) -.map(item -> String.format("`%s`", item.trim().replace("`", ""))) -.collect(Collectors.joining(", "))); -} +String[] selectFields = DataType.getFieldNames(physicalRowDataType).toArray(new String[0]); +this.readOptions.setReadFields( +Arrays.stream(selectFields) +.map(item -> String.format("`%s`", item.trim().replace("`", ""))) +.collect(Collectors.joining(", "))); } @VisibleForTesting diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java index e13eeb36..027159db 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java @@ -49,6 +49,7 @@ public class DorisSourceITCase extends DorisTestBase { static final String TABLE_READ_TBL = "tbl_read_tbl"; static final String TABLE_READ_TBL_OLD_API = "tbl_read_tbl_old_api"; static final String TABLE_READ_TBL_ALL_OPTIONS = "tbl_read_tbl_all_options"; +static final String TABLE_READ_TBL_PUSH_DOWN = "tbl_read_tbl_push_down"; @Test public void testSource() throws Exception { @@ -231,6 +232,41 @@ public class DorisSourceITCase extends DorisTestBase { Assert.assertArrayEquals(expected, actual.toArray()); } +@Test +public void testTableSourceFilterAndProjectionPushDown() throws Exception { +initializeTable(TABLE_READ_TBL_PUSH_DOWN); +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setParallelism(1); +final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + +String sourceDDL = +String.format( +"CREATE TABLE doris_source (" ++ " age INT" ++ ") WITH (" ++ " 'connector' = 'doris'," +
(doris-flink-connector) branch master updated: [Fix](source) Fixed incorrect Map value reading when key/value is of DATE or DATETIME type (#419)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 2b595a78 [Fix](source) Fixed incorrect Map value reading when key/value is of DATE or DATETIME type (#419) 2b595a78 is described below commit 2b595a7819378650052a5beac4c0b707876b23ab Author: bingquanzhao AuthorDate: Mon Jul 15 10:16:33 2024 +0800 [Fix](source) Fixed incorrect Map value reading when key/value is of DATE or DATETIME type (#419) --- .../apache/doris/flink/serialization/RowBatch.java | 37 ++ .../doris/flink/serialization/TestRowBatch.java| 29 + 2 files changed, 59 insertions(+), 7 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java index c7afe7f5..1a42b2b9 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java @@ -40,7 +40,10 @@ import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.MapVector; import org.apache.arrow.vector.complex.StructVector; +import org.apache.arrow.vector.complex.impl.DateDayReaderImpl; +import org.apache.arrow.vector.complex.impl.TimeStampMicroReaderImpl; import org.apache.arrow.vector.complex.impl.UnionMapReader; +import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.ipc.ArrowStreamReader; import org.apache.arrow.vector.types.Types; import org.apache.doris.flink.exception.DorisException; @@ -105,6 +108,7 @@ public class RowBatch { private final DateTimeFormatter dateTimeV2Formatter = DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN); private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("-MM-dd"); +private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault(); public List getRowBatch() { return rowBatch; @@ -454,7 +458,11 @@ public class RowBatch { reader.setPosition(rowIndex); Map mapValue = new HashMap<>(); while (reader.next()) { -mapValue.put(reader.key().readObject().toString(), reader.value().readObject()); +FieldReader keyReader = reader.key(); +FieldReader valueReader = reader.value(); +Object mapKeyObj = handleMapFieldReader(keyReader); +Object mapValueObj = handleMapFieldReader(valueReader); +mapValue.put(mapKeyObj.toString(), mapValueObj); } addValueToRow(rowIndex, mapValue); break; @@ -478,6 +486,16 @@ public class RowBatch { return true; } +private Object handleMapFieldReader(FieldReader reader) { +if (reader instanceof TimeStampMicroReaderImpl) { +return longToLocalDateTime(reader.readLong()); +} +if (reader instanceof DateDayReaderImpl) { +return LocalDate.ofEpochDay(((Integer) reader.readObject()).longValue()); +} +return reader.readObject(); +} + @VisibleForTesting public LocalDateTime getDateTime(int rowIndex, FieldVector fieldVector) { TimeStampMicroVector vector = (TimeStampMicroVector) fieldVector; @@ -485,16 +503,21 @@ public class RowBatch { return null; } long time = vector.get(rowIndex); +return longToLocalDateTime(time); +} + +@VisibleForTesting +public static LocalDateTime longToLocalDateTime(long time) { Instant instant; -if (time / 100L == 0) { // datetime(0) +// Determine the timestamp accuracy and process it +if (time < 10_000_000_000L) { // Second timestamp instant = Instant.ofEpochSecond(time); -} else if (time / 10L == 0) { // datetime(3) +} else if (time < 10_000_000_000_000L) { // milli second instant = Instant.ofEpochMilli(time); -} else { // datetime(6) -instant = Instant.ofEpochSecond(time / 100, time % 100 * 1000); +} else { // micro second +instant = Instant.ofEpochSecond(time / 1_000_000, (time % 1_000_000) * 1_000); } -LocalDateTime dateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault()); -return dateTime; +return LocalDateTime.ofInstant(instant, DEFAULT_ZONE_ID); } @VisibleForTesting diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java b/flink-doris-connector/src/test/java/
(doris-flink-connector) branch master updated: [Fix] Fix the problem that the existing tables in Doris cannot be synchronized.(#425)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new fe88af97 [Fix] Fix the problem that the existing tables in Doris cannot be synchronized.(#425) fe88af97 is described below commit fe88af97a54a9bc792a4291fb80a95f5fe15e38b Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Mon Jul 15 10:04:50 2024 +0800 [Fix] Fix the problem that the existing tables in Doris cannot be synchronized.(#425) --- .../java/org/apache/doris/flink/tools/cdc/DatabaseSync.java| 10 +- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index a4d0511b..5cea70f9 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -152,13 +152,15 @@ public abstract class DatabaseSync { // Calculate the mapping relationship between upstream and downstream tables tableMapping.put( schema.getTableIdentifier(), String.format("%s.%s", targetDb, dorisTable)); -if (tryCreateTableIfAbsent( +tryCreateTableIfAbsent( dorisSystem, targetDb, dorisTable, schema, tableBucketsMap, -tablesWithBucketsAssigned)) { +tablesWithBucketsAssigned); + +if (!dorisTables.contains(Tuple2.of(targetDb, dorisTable))) { dorisTables.add(Tuple2.of(targetDb, dorisTable)); } } @@ -463,7 +465,7 @@ public abstract class DatabaseSync { } } -private boolean tryCreateTableIfAbsent( +private void tryCreateTableIfAbsent( DorisSystem dorisSystem, String targetDb, String dorisTable, @@ -480,12 +482,10 @@ public abstract class DatabaseSync { } try { dorisSystem.createTable(dorisSchema); -return true; } catch (Exception ex) { handleTableCreationFailure(ex); } } -return false; } private void handleTableCreationFailure(Exception ex) throws DorisSystemException { - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch master updated: [Fix] Fix Oracle cdb and pdb model unable to create tables (#423)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new d7b20a4e [Fix] Fix Oracle cdb and pdb model unable to create tables (#423) d7b20a4e is described below commit d7b20a4e0102983e7c2cca6e1a2ed5fb1ba9b0bc Author: wudi <676366...@qq.com> AuthorDate: Thu Jul 11 17:26:09 2024 +0800 [Fix] Fix Oracle cdb and pdb model unable to create tables (#423) --- .../flink/tools/cdc/oracle/OracleDatabaseSync.java | 21 + 1 file changed, 21 insertions(+) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java index 360351e4..beb6a677 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java @@ -44,6 +44,7 @@ import java.sql.DatabaseMetaData; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -63,6 +64,7 @@ public class OracleDatabaseSync extends DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(OracleDatabaseSync.class); private static final String JDBC_URL = "jdbc:oracle:thin:@%s:%d:%s"; +private static final String PDB_KEY = "debezium.database.pdb.name"; public OracleDatabaseSync() throws SQLException { super(); @@ -108,9 +110,11 @@ public class OracleDatabaseSync extends DatabaseSync { public List getSchemaList() throws Exception { String databaseName = config.get(OracleSourceOptions.DATABASE_NAME); String schemaName = config.get(OracleSourceOptions.SCHEMA_NAME); + List schemaList = new ArrayList<>(); LOG.info("database-name {}, schema-name {}", databaseName, schemaName); try (Connection conn = getConnection()) { +setSessionToPdb(conn); DatabaseMetaData metaData = conn.getMetaData(); try (ResultSet tables = metaData.getTables(databaseName, schemaName, "%", new String[] {"TABLE"})) { @@ -134,6 +138,23 @@ public class OracleDatabaseSync extends DatabaseSync { return schemaList; } +private void setSessionToPdb(Connection conn) throws SQLException { +String pdbName = null; +for (Map.Entry entry : config.toMap().entrySet()) { +String key = entry.getKey(); +if (key.equals(PDB_KEY)) { +pdbName = entry.getValue(); +break; +} +} +if (!StringUtils.isNullOrWhitespaceOnly(pdbName)) { +LOG.info("Found pdb name in config, set session to pdb to {}", pdbName); +try (Statement statement = conn.createStatement()) { +statement.execute("alter session set container=" + pdbName); +} +} +} + @Override public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { Properties debeziumProperties = new Properties(); - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
svn commit: r70205 - in /dev/doris/kafka-connector: ./ 1.1.0-rc01/ 1.1.0-rc01/apache-doris-kafka-connector-1.1.0-src.tar.gz 1.1.0-rc01/apache-doris-kafka-connector-1.1.0-src.tar.gz.asc 1.1.0-rc01/apac
Author: diwu Date: Wed Jul 10 02:43:59 2024 New Revision: 70205 Log: release doris kafka connector 1.1.0 Added: dev/doris/kafka-connector/ dev/doris/kafka-connector/1.1.0-rc01/ dev/doris/kafka-connector/1.1.0-rc01/apache-doris-kafka-connector-1.1.0-src.tar.gz (with props) dev/doris/kafka-connector/1.1.0-rc01/apache-doris-kafka-connector-1.1.0-src.tar.gz.asc (with props) dev/doris/kafka-connector/1.1.0-rc01/apache-doris-kafka-connector-1.1.0-src.tar.gz.sha512 Added: dev/doris/kafka-connector/1.1.0-rc01/apache-doris-kafka-connector-1.1.0-src.tar.gz == Binary file - no diff available. Propchange: dev/doris/kafka-connector/1.1.0-rc01/apache-doris-kafka-connector-1.1.0-src.tar.gz -- svn:mime-type = application/x-gzip Added: dev/doris/kafka-connector/1.1.0-rc01/apache-doris-kafka-connector-1.1.0-src.tar.gz.asc == Binary file - no diff available. Propchange: dev/doris/kafka-connector/1.1.0-rc01/apache-doris-kafka-connector-1.1.0-src.tar.gz.asc -- svn:mime-type = application/pgp-signature Added: dev/doris/kafka-connector/1.1.0-rc01/apache-doris-kafka-connector-1.1.0-src.tar.gz.sha512 == --- dev/doris/kafka-connector/1.1.0-rc01/apache-doris-kafka-connector-1.1.0-src.tar.gz.sha512 (added) +++ dev/doris/kafka-connector/1.1.0-rc01/apache-doris-kafka-connector-1.1.0-src.tar.gz.sha512 Wed Jul 10 02:43:59 2024 @@ -0,0 +1 @@ +bdd50f5b8a43a5db01ff5414f6681d9477dad3888e9d988e2e8ba1ad45ef16a20f0c2f6883d63eb06b1b85e5416a8ca82a08c6998fd387e5be8b9feb94fbbec7 apache-doris-kafka-connector-1.1.0-src.tar.gz - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-kafka-connector) tag 1.1.0-rc01 created (now 17422f7)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to tag 1.1.0-rc01 in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git at 17422f7 (commit) This tag includes the following new commits: new 17422f7 Commit for release 1.1.0 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-kafka-connector) 01/01: Commit for release 1.1.0
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to tag 1.1.0-rc01 in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git commit 17422f70c594e58cdad018a14118aa666e2224d2 Author: wudi <676366...@qq.com> AuthorDate: Wed Jul 10 10:21:13 2024 +0800 Commit for release 1.1.0 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index fd99b7d..7dd65ed 100644 --- a/pom.xml +++ b/pom.xml @@ -72,7 +72,7 @@ -1.0.0-SNAPSHOT +1.1.0 3.10.1 3.3.0 3.2.1 - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-kafka-connector) branch release-1.1.0 created (now d2a3ac3)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git at d2a3ac3 [e2e](test) add partial column update and debezium ingestion e2e case (#38) No new revisions were added by this update. - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-kafka-connector) branch master updated: [e2e](test) add partial column update and debezium ingestion e2e case (#38)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git The following commit(s) were added to refs/heads/master by this push: new d2a3ac3 [e2e](test) add partial column update and debezium ingestion e2e case (#38) d2a3ac3 is described below commit d2a3ac3479cda9530ff3e9cd56d8973e581b7ea1 Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Wed Jul 10 10:04:31 2024 +0800 [e2e](test) add partial column update and debezium ingestion e2e case (#38) --- .../e2e/sink/AbstractKafka2DorisSink.java | 12 + .../e2e/sink/stringconverter/StringMsgE2ETest.java | 61 ++ .../resources/e2e/string_converter/full_types.json | 22 .../full_types_debezium_ingestion.sql | 59 + .../string_converter/insert_partial_update_tab.sql | 3 ++ .../e2e/string_converter/partial_update.json | 24 + .../e2e/string_converter/partial_update_tab.sql| 15 ++ 7 files changed, 196 insertions(+) diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java index c4c7fe4..d50556f 100644 --- a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java @@ -108,6 +108,18 @@ public abstract class AbstractKafka2DorisSink { LOG.info("Create doris table successfully. sql={}", sql); } +protected void insertTable(String sql) { +LOG.info("Will insert data to Doris table. SQL: {}", sql); +try { +Statement statement = getJdbcConnection().createStatement(); +int rowCount = statement.executeUpdate(sql); +LOG.info("Inserted {} item data into the Doris table.", rowCount); +} catch (SQLException e) { +throw new DorisException("Failed to insert data to Doris table.", e); +} +LOG.info("Data insertion to Doris table was successful. SQL: {}", sql); +} + private static void initDorisBase() { if (Objects.nonNull(dorisContainerService)) { return; diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java index 227a203..3143461 100644 --- a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java @@ -125,6 +125,65 @@ public class StringMsgE2ETest extends AbstractStringE2ESinkTest { checkResult(expected, query, 3); } +@Test +public void testPartialUpdate() throws Exception { + initialize("src/test/resources/e2e/string_converter/partial_update.json"); +String topic = "partial_update_test"; +String msg1 = + "{\"id\":1,\"col1\":\"after_update_col1_1\",\"col2\":\"after_update_col2_1\"}"; +String msg2 = + "{\"id\":2,\"col1\":\"after_update_col1_2\",\"col2\":\"after_update_col2_2\"}"; + +produceMsg2Kafka(topic, msg1); +produceMsg2Kafka(topic, msg2); + +String tableSql = + loadContent("src/test/resources/e2e/string_converter/partial_update_tab.sql"); +String insertSql = +loadContent( + "src/test/resources/e2e/string_converter/insert_partial_update_tab.sql"); +createTable(tableSql); +Thread.sleep(2000); +insertTable(insertSql); +Thread.sleep(15000); +kafkaContainerService.registerKafkaConnector(connectorName, jsonMsgConnectorContent); + +String table = dorisOptions.getTopicMapTable(topic); +List expected = +Arrays.asList( + "1,after_update_col1_1,after_update_col2_1,before_update_col3_1", + "2,after_update_col1_2,after_update_col2_2,before_update_col3_2"); +Thread.sleep(1); +String query = +String.format("select id,col1,col2,col3 from %s.%s order by id", database, table); +checkResult(expected, query, 4); +} + +@Test +public void testDebeziumIngestionFullTypes() throws Exception { +initialize("src/test/resources/e2e/string_converter/full_types.json"); +String topic = "full_types"; +String msg1 = + "{\&q
(doris-kafka-connector) branch master updated: [feature]support stream load with group commit mode (#35)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git The following commit(s) were added to refs/heads/master by this push: new 4ace2e6 [feature]support stream load with group commit mode (#35) 4ace2e6 is described below commit 4ace2e65a7c0bf383c14010e7a4823e5fd5e0aaf Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Tue Jul 9 15:13:16 2024 +0800 [feature]support stream load with group commit mode (#35) --- .../doris/kafka/connector/cfg/DorisOptions.java| 27 +++ .../kafka/connector/utils/ConfigCheckUtils.java| 27 +++ .../kafka/connector/writer/LoadConstants.java | 4 ++ .../connector/writer/load/DorisStreamLoad.java | 23 +++-- .../GroupCommitMode.java} | 32 ++--- .../cfg/TestDorisSinkConnectorConfig.java | 52 .../e2e/sink/stringconverter/StringMsgE2ETest.java | 55 -- .../string_converter/group_commit_connector.json | 23 + .../e2e/string_converter/group_commit_tab.sql | 13 + 9 files changed, 234 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java index 69cbb80..e8c1933 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java @@ -55,8 +55,9 @@ public class DorisOptions { private boolean autoRedirect = true; private int requestReadTimeoutMs; private int requestConnectTimeoutMs; +private boolean enableGroupCommit; /** Properties for the StreamLoad. */ -private final Properties streamLoadProp = new Properties(); +private final Properties streamLoadProp; @Deprecated private String labelPrefix; private final String databaseTimeZone; @@ -113,25 +114,31 @@ public class DorisOptions { this.requestReadTimeoutMs = Integer.parseInt(config.get(DorisSinkConnectorConfig.REQUEST_READ_TIMEOUT_MS)); } -getStreamLoadPropFromConfig(config); +this.streamLoadProp = getStreamLoadPropFromConfig(config); +this.enableGroupCommit = +ConfigCheckUtils.validateGroupCommitMode(getStreamLoadProp(), enable2PC()); } -private void getStreamLoadPropFromConfig(Map config) { -setStreamLoadDefaultValues(); +private Properties getStreamLoadPropFromConfig(Map config) { +Properties properties = new Properties(); +properties.putAll(getStreamLoadDefaultValues()); for (Map.Entry entry : config.entrySet()) { if (entry.getKey().startsWith(DorisSinkConnectorConfig.STREAM_LOAD_PROP_PREFIX)) { String subKey = entry.getKey() .substring( DorisSinkConnectorConfig.STREAM_LOAD_PROP_PREFIX.length()); -streamLoadProp.put(subKey, entry.getValue()); +properties.put(subKey, entry.getValue()); } } +return properties; } -private void setStreamLoadDefaultValues() { -streamLoadProp.setProperty("format", "json"); -streamLoadProp.setProperty("read_json_by_line", "true"); +private Properties getStreamLoadDefaultValues() { +Properties properties = new Properties(); +properties.setProperty("format", "json"); +properties.setProperty("read_json_by_line", "true"); +return properties; } public String getName() { @@ -182,6 +189,10 @@ public class DorisOptions { return enable2PC; } +public boolean enableGroupCommit() { +return enableGroupCommit; +} + public Map getTopicMap() { return topicMap; } diff --git a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java index ca8ec31..51b8b06 100644 --- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java +++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java @@ -19,8 +19,11 @@ package org.apache.doris.kafka.connector.utils; +import static org.apache.doris.kafka.connector.writer.LoadConstants.PARTIAL_COLUMNS; + import java.util.HashMap; import java.util.Map; +import java.util.Properties; import java.util.regex.Pattern; import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig; import org.apache.doris.kafka.connector.converter.ConverterMode; @@ -28,6 +31,8 @@ import org.apache.doris.kafka.connector.converter.schema.SchemaEvoluti
svn commit: r70190 - in /dev/doris/flink-connector/1.6.2: ./ apache-doris-flink-connector-1.6.2-src.tar.gz apache-doris-flink-connector-1.6.2-src.tar.gz.asc apache-doris-flink-connector-1.6.2-src.tar.
Author: diwu Date: Tue Jul 9 06:54:34 2024 New Revision: 70190 Log: add flink connector 1.6.2 Added: dev/doris/flink-connector/1.6.2/ dev/doris/flink-connector/1.6.2/apache-doris-flink-connector-1.6.2-src.tar.gz (with props) dev/doris/flink-connector/1.6.2/apache-doris-flink-connector-1.6.2-src.tar.gz.asc (with props) dev/doris/flink-connector/1.6.2/apache-doris-flink-connector-1.6.2-src.tar.gz.sha512 Added: dev/doris/flink-connector/1.6.2/apache-doris-flink-connector-1.6.2-src.tar.gz == Binary file - no diff available. Propchange: dev/doris/flink-connector/1.6.2/apache-doris-flink-connector-1.6.2-src.tar.gz -- svn:mime-type = application/x-gzip Added: dev/doris/flink-connector/1.6.2/apache-doris-flink-connector-1.6.2-src.tar.gz.asc == Binary file - no diff available. Propchange: dev/doris/flink-connector/1.6.2/apache-doris-flink-connector-1.6.2-src.tar.gz.asc -- svn:mime-type = application/pgp-signature Added: dev/doris/flink-connector/1.6.2/apache-doris-flink-connector-1.6.2-src.tar.gz.sha512 == --- dev/doris/flink-connector/1.6.2/apache-doris-flink-connector-1.6.2-src.tar.gz.sha512 (added) +++ dev/doris/flink-connector/1.6.2/apache-doris-flink-connector-1.6.2-src.tar.gz.sha512 Tue Jul 9 06:54:34 2024 @@ -0,0 +1 @@ +daf5cc37cd072e5667bfd09fe08b7f0d9abe9a346b9fd6d7488149ad6d105d2a944633befa1661e2cc3779aac56f2c09c298b5954ed48efc9a88d6689150cc01 apache-doris-flink-connector-1.6.2-src.tar.gz - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) tag 1.6.2 created (now bf624692)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to tag 1.6.2 in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git at bf624692 (commit) This tag includes the following new commits: new bf624692 Commit for release 1.6.2 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) 01/01: Commit for release 1.6.2
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to tag 1.6.2 in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git commit bf6246929a097cb9f46490ced31f3bc4d82f8e49 Author: wudi <676366...@qq.com> AuthorDate: Tue Jul 9 14:25:50 2024 +0800 Commit for release 1.6.2 --- flink-doris-connector/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index b5080fb0..5fad8e0f 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -67,7 +67,7 @@ under the License. -1.6.2-SNAPSHOT +1.6.2 1.18.0 1.18 2.4.2 - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch release-1.6.2 created (now e8ae64ed)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch release-1.6.2 in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git at e8ae64ed [Chore] Update Version Code (#417) No new revisions were added by this update. - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch master updated: [Chore] Update Version Code (#417)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new e8ae64ed [Chore] Update Version Code (#417) e8ae64ed is described below commit e8ae64ed50116e1046e6ee3379a6c7065553050d Author: wudi <676366...@qq.com> AuthorDate: Tue Jul 9 14:09:36 2024 +0800 [Chore] Update Version Code (#417) --- flink-doris-connector/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index 571f63f1..b5080fb0 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -67,7 +67,7 @@ under the License. -1.6.1-SNAPSHOT +1.6.2-SNAPSHOT 1.18.0 1.18 2.4.2 - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris) branch master updated (8eb9a1a0661 -> 110329c548e)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/doris.git from 8eb9a1a0661 [Fix]check nullptr when log thread num (#37263) add 110329c548e [Fix](case) add log for flink connector case download file (#37202) No new revisions were added by this update. Summary of changes: .../flink_connector_syncdb.groovy | 21 - 1 file changed, 16 insertions(+), 5 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch master updated: [feature](cdc) add ignore-incompatible option (#371)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 060e13da [feature](cdc) add ignore-incompatible option (#371) 060e13da is described below commit 060e13dad0b7a7c94fee1c397739f0df29da1e2b Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Wed Jul 3 15:40:26 2024 +0800 [feature](cdc) add ignore-incompatible option (#371) --- .../doris/flink/catalog/doris/DorisSystem.java | 3 +- .../org/apache/doris/flink/tools/cdc/CdcTools.java | 2 + .../apache/doris/flink/tools/cdc/DatabaseSync.java | 65 ++ .../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java | 2 + .../tools/cdc/CdcOraclelSyncDatabaseCase.java | 2 + .../tools/cdc/CdcPostgresSyncDatabaseCase.java | 2 + .../tools/cdc/CdcSqlServerSyncDatabaseCase.java| 2 + 7 files changed, 64 insertions(+), 14 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java index 31d32e01..ab26e308 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java @@ -281,8 +281,7 @@ public class DorisSystem implements Serializable { } private static List identifier(List name) { -List result = name.stream().map(m -> identifier(m)).collect(Collectors.toList()); -return result; +return name.stream().map(DorisSystem::identifier).collect(Collectors.toList()); } public static String identifier(String name) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java index 7443ef8a..38b942ea 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java @@ -132,6 +132,7 @@ public class CdcTools { String multiToOneTarget = params.get("multi-to-one-target"); boolean createTableOnly = params.has("create-table-only"); boolean ignoreDefaultValue = params.has("ignore-default-value"); +boolean ignoreIncompatible = params.has("ignore-incompatible"); boolean singleSink = params.has("single-sink"); Preconditions.checkArgument(params.has("sink-conf")); @@ -155,6 +156,7 @@ public class CdcTools { .setTableConfig(tableMap) .setCreateTableOnly(createTableOnly) .setSingleSink(singleSink) +.setIgnoreIncompatible(ignoreIncompatible) .create(); databaseSync.build(); if (StringUtils.isNullOrWhitespaceOnly(jobName)) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 691eaafa..a4d0511b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -34,6 +34,7 @@ import org.apache.doris.flink.cfg.DorisConnectionOptions; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.exception.DorisSystemException; import org.apache.doris.flink.sink.DorisSink; import org.apache.doris.flink.sink.writer.WriteMode; import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer; @@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; import java.sql.Connection; import java.sql.SQLException; +import java.sql.SQLSyntaxErrorException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -71,6 +73,7 @@ public abstract class DatabaseSync { protected Map tableConfig = new HashMap<>(); protected Configuration sinkConfig; protected boolean ignoreDefaultValue; +protected boolean ignoreIncompatible; public StreamExecutionEnvironment env; private boolean createTableOnly = false; @@ -128,7 +131,9 @@ public abstract class DatabaseSync { if (tableConfig.containsKey("table-buckets")) { tableBucketsMap = getTableBuckets(tableConfig.get("table-buckets")); } -Set bucketsTable = new HashSet<>(); + +// Set of table names that have assigned bucket numb
(doris-flink-connector) branch master updated: (Fix)[source] Fix the problem of multiple fenodes when flink reads Doris (#416)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 9d1fb6bc (Fix)[source] Fix the problem of multiple fenodes when flink reads Doris (#416) 9d1fb6bc is described below commit 9d1fb6bcf381a2c58f046914c67a1fa9efa5d967 Author: wudi <676366...@qq.com> AuthorDate: Wed Jul 3 14:34:53 2024 +0800 (Fix)[source] Fix the problem of multiple fenodes when flink reads Doris (#416) --- .../src/main/java/org/apache/doris/flink/rest/RestService.java | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java index c7304f9d..1dbb1fde 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -277,8 +277,9 @@ public class RestService implements Serializable { List nodes = Arrays.asList(feNodes.split(",")); Collections.shuffle(nodes); for (String feNode : nodes) { -if (BackendUtil.tryHttpConnection(feNode)) { -return feNode; +String host = feNode.trim(); +if (BackendUtil.tryHttpConnection(host)) { +return host; } } throw new DorisRuntimeException( @@ -548,7 +549,7 @@ public class RestService implements Serializable { String queryPlanUri = String.format( QUERY_PLAN_API, -options.getFenodes(), +randomEndpoint(options.getFenodes(), logger), tableIdentifier[0], tableIdentifier[1]); HttpPost httpPost = new HttpPost(queryPlanUri); - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-kafka-connector) branch master updated: [Improve]Improve the unit test case of DorisWriter (#34)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git The following commit(s) were added to refs/heads/master by this push: new 7bf7c17 [Improve]Improve the unit test case of DorisWriter (#34) 7bf7c17 is described below commit 7bf7c171a5add364622f292e70e2119fb894d686 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Tue Jul 2 16:26:38 2024 +0800 [Improve]Improve the unit test case of DorisWriter (#34) --- .../kafka/connector/writer/TestCopyIntoWriter.java | 22 +- .../connector/writer/TestStreamLoadWriter.java | 21 - 2 files changed, 17 insertions(+), 26 deletions(-) diff --git a/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyIntoWriter.java b/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyIntoWriter.java index 302b9ea..60bcd81 100644 --- a/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyIntoWriter.java +++ b/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyIntoWriter.java @@ -32,7 +32,6 @@ import java.util.Properties; import org.apache.doris.kafka.connector.cfg.DorisOptions; import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig; import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider; -import org.apache.doris.kafka.connector.exception.CopyLoadException; import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor; import org.apache.doris.kafka.connector.writer.load.CopyLoad; import org.apache.kafka.connect.sink.SinkRecord; @@ -67,22 +66,14 @@ public class TestCopyIntoWriter { dorisOptions = new DorisOptions((Map) props); } -@Test(expected = CopyLoadException.class) +@Test public void fetchOffset() { -DorisConnectMonitor dorisConnectMonitor = mock(DorisConnectMonitor.class); -dorisWriter = -new CopyIntoWriter( -"test5", -0, -dorisOptions, -new JdbcConnectionProvider(dorisOptions), -dorisConnectMonitor); +dorisWriter = mockCopyIntoWriter(new String[] {}); dorisWriter.fetchOffset(); Assert.assertEquals(-1l, dorisWriter.getOffsetPersistedInDoris().longValue()); } -@Test -public void fetchOffsetTest() { +private CopyIntoWriter mockCopyIntoWriter(String[] listLoadFiles) { DorisConnectMonitor dorisConnectMonitor = mock(DorisConnectMonitor.class); CopyIntoWriter copyIntoWriter = spy( @@ -93,7 +84,12 @@ public class TestCopyIntoWriter { new JdbcConnectionProvider(dorisOptions), dorisConnectMonitor)); doReturn(Arrays.asList(listLoadFiles)).when(copyIntoWriter).listLoadFiles(); -dorisWriter = copyIntoWriter; +return copyIntoWriter; +} + +@Test +public void fetchOffsetTest() { +dorisWriter = mockCopyIntoWriter(listLoadFiles); dorisWriter.fetchOffset(); System.out.println(dorisWriter.getOffsetPersistedInDoris().longValue()); Assert.assertEquals(168172036, dorisWriter.getOffsetPersistedInDoris().longValue()); diff --git a/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java b/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java index 7e44a2d..ea54211 100644 --- a/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java +++ b/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java @@ -36,7 +36,6 @@ import java.util.Properties; import org.apache.doris.kafka.connector.cfg.DorisOptions; import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig; import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider; -import org.apache.doris.kafka.connector.exception.StreamLoadException; import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor; import org.apache.doris.kafka.connector.writer.commit.DorisCommittable; import org.apache.doris.kafka.connector.writer.load.DorisStreamLoad; @@ -82,22 +81,14 @@ public class TestStreamLoadWriter { "VISIBLE"); } -@Test(expected = StreamLoadException.class) +@Test public void fetchOffset() { -DorisConnectMonitor dorisConnectMonitor = mock(DorisConnectMonitor.class); -dorisWriter = -new StreamLoadWriter( -"avro-complex10", -2, -dorisOptions, -new JdbcConnectionProvider(dorisOptions), -dorisConnectMonitor); +dorisWriter = mockStreamLoadWriter(new HashMap<>()); dorisWriter.fetch
(doris-kafka-connector) branch master updated: [Fix]Fix the E2E test stream load data is not sink to doris (#37)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git The following commit(s) were added to refs/heads/master by this push: new ec4ce3d [Fix]Fix the E2E test stream load data is not sink to doris (#37) ec4ce3d is described below commit ec4ce3d04ee2cf0aaf93f1ff9af1ef0aa659d0b2 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Tue Jul 2 16:26:11 2024 +0800 [Fix]Fix the E2E test stream load data is not sink to doris (#37) --- .../e2e/kafka/KafkaContainerServiceImpl.java | 2 +- .../e2e/sink/stringconverter/StringMsgE2ETest.java | 23 -- .../e2e/string_converter/string_msg_connector.json | 2 +- 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java index 083cdb2..4e38ab3 100644 --- a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java @@ -214,7 +214,7 @@ public class KafkaContainerServiceImpl implements KafkaContainerService { // The current thread sleeps for 10 seconds so that connect can consume messages to doris in // time. -Thread.sleep(1); +Thread.sleep(6); } @Override diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java index 9ab8891..cd3f455 100644 --- a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java @@ -33,8 +33,11 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class StringMsgE2ETest extends AbstractStringE2ESinkTest { +private static final Logger LOG = LoggerFactory.getLogger(StringMsgE2ETest.class); private static String connectorName; private static String jsonMsgConnectorContent; private static DorisOptions dorisOptions; @@ -80,12 +83,20 @@ public class StringMsgE2ETest extends AbstractStringE2ESinkTest { String table = dorisOptions.getTopicMapTable(topic); Statement statement = getJdbcConnection().createStatement(); -ResultSet resultSet = statement.executeQuery("select * from " + database + "." + table); -if (resultSet.next()) { -Assert.assertEquals(1, resultSet.getString("id")); -Assert.assertEquals("zhangsan", resultSet.getString("name")); -Assert.assertEquals(12, resultSet.getString("12")); -} +String querySql = "select * from " + database + "." + table; +LOG.info("start to query result from doris. sql={}", querySql); +ResultSet resultSet = statement.executeQuery(querySql); + +Assert.assertTrue(resultSet.next()); + +int id = resultSet.getInt("id"); +String name = resultSet.getString("name"); +int age = resultSet.getInt("age"); +LOG.info("Query result is id={}, name={}, age={}", id, name, age); + +Assert.assertEquals(1, id); +Assert.assertEquals("zhangsan", name); +Assert.assertEquals(12, age); } @AfterClass diff --git a/src/test/resources/e2e/string_converter/string_msg_connector.json b/src/test/resources/e2e/string_converter/string_msg_connector.json index 77340ea..dd994cc 100644 --- a/src/test/resources/e2e/string_converter/string_msg_connector.json +++ b/src/test/resources/e2e/string_converter/string_msg_connector.json @@ -5,7 +5,7 @@ "topics":"string_test", "tasks.max":"1", "doris.topic2table.map": "string_test:string_msg_tab", -"buffer.count.records":"10", +"buffer.count.records":"1", "buffer.flush.time":"120", "buffer.size.bytes":"1000", "doris.urls":"127.0.0.1", - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch master updated: [improve]improve group commit logic (#413)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new ba21aabd [improve]improve group commit logic (#413) ba21aabd is described below commit ba21aabd0be664141e44581c5d16d97e9ff5467f Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Tue Jul 2 11:26:40 2024 +0800 [improve]improve group commit logic (#413) --- .../flink/sink/batch/DorisBatchStreamLoad.java | 14 +- .../doris/flink/sink/writer/DorisStreamLoad.java | 44 +++ .../doris/flink/sink/writer/LoadConstants.java | 1 + .../apache/doris/flink/sink/DorisSinkITCase.java | 51 ++ 4 files changed, 99 insertions(+), 11 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index fbc6daa0..375e4335 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -68,6 +68,7 @@ import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW; import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT; +import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT_OFF_MODE; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY; @@ -122,7 +123,11 @@ public class DorisBatchStreamLoad implements Serializable { LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)) .getBytes(); } -this.enableGroupCommit = loadProps.containsKey(GROUP_COMMIT); +this.enableGroupCommit = +loadProps.containsKey(GROUP_COMMIT) +&& !loadProps +.getProperty(GROUP_COMMIT) +.equalsIgnoreCase(GROUP_COMMIT_OFF_MODE); this.executionOptions = executionOptions; this.flushQueue = new LinkedBlockingDeque<>(executionOptions.getFlushQueueSize()); if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) { @@ -283,7 +288,12 @@ public class DorisBatchStreamLoad implements Serializable { Throwable resEx = new Throwable(); int retry = 0; while (retry <= executionOptions.getMaxRetries()) { -LOG.info("stream load started for {} on host {}", label, hostPort); +if (enableGroupCommit) { +LOG.info("stream load started with group commit on host {}", hostPort); +} else { +LOG.info("stream load started for {} on host {}", label, hostPort); +} + try (CloseableHttpClient httpClient = httpClientBuilder.build()) { try (CloseableHttpResponse response = httpClient.execute(putBuilder.build())) { int statusCode = response.getStatusLine().getStatusCode(); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index 676de3df..4cbcb431 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -59,6 +59,7 @@ import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW; import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT; +import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT_OFF_MODE; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY; @@ -131,7 +132,11 @@ public class DorisStreamLoad implements Serializable { LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)) .getBytes(); } -enableGroupCommit = streamLoadProp.containsKey(GROUP_COMMIT); +this.enableGroupCommit = +streamLoadProp.containsKey(GROU
(doris-flink-connector) branch master updated: [fix](cdc)fix mongodb sync cause ClassCastException when user specifies the `_id` field manually (#410)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new a24898c5 [fix](cdc)fix mongodb sync cause ClassCastException when user specifies the `_id` field manually (#410) a24898c5 is described below commit a24898c5d1952dea50a9e6c1b2d2c41b8bc00b39 Author: North Lin <37775475+qg-...@users.noreply.github.com> AuthorDate: Tue Jul 2 11:07:40 2024 +0800 [fix](cdc)fix mongodb sync cause ClassCastException when user specifies the `_id` field manually (#410) --- .../flink/tools/cdc/mongodb/MongoDBDatabaseSync.java | 5 - .../mongodb/serializer/MongoJsonDebeziumDataChange.java | 16 ++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java index 7c0b6706..fe7f33d0 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java @@ -133,7 +133,8 @@ public class MongoDBDatabaseSync extends DatabaseSync { private ArrayList sampleData(MongoCollection collection, Long sampleNum) { ArrayList query = new ArrayList<>(); query.add(new Document("$sample", new Document("size", sampleNum))); -return collection.aggregate(query).into(new ArrayList<>()); +// allowDiskUse to avoid mongo 'Sort exceeded memory limit' error +return collection.aggregate(query).allowDiskUse(true).into(new ArrayList<>()); } private static String buildConnectionString( @@ -159,6 +160,8 @@ public class MongoDBDatabaseSync extends DatabaseSync { String username = config.get(MongoDBSourceOptions.USERNAME); String password = config.get(MongoDBSourceOptions.PASSWORD); String database = config.get(MongoDBSourceOptions.DATABASE); +// note: just to unify job name, no other use. +config.setString("database-name", database); String collection = config.get(MongoDBSourceOptions.COLLECTION); if (StringUtils.isBlank(collection)) { collection = config.get(TABLE_NAME); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java index 4b20ebd6..8048e38a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java @@ -126,7 +126,13 @@ public class MongoJsonDebeziumDataChange extends CdcDataChange implements Change public Map extractAfterRow(JsonNode recordRoot) { JsonNode dataNode = recordRoot.get(FIELD_DATA); Map rowMap = extractRow(dataNode); -String objectId = ((Map) rowMap.get(ID_FIELD)).get(OID_FIELD).toString(); +String objectId; +// if user specifies the `_id` field manually, the $oid field may not exist +if (rowMap.get(ID_FIELD) instanceof Map) { +objectId = ((Map) rowMap.get(ID_FIELD)).get(OID_FIELD).toString(); +} else { +objectId = rowMap.get(ID_FIELD).toString(); +} rowMap.put(ID_FIELD, objectId); return rowMap; } @@ -135,7 +141,13 @@ public class MongoJsonDebeziumDataChange extends CdcDataChange implements Change throws JsonProcessingException { String documentKey = extractJsonNode(recordRoot, FIELD_DOCUMENT_KEY); JsonNode jsonNode = objectMapper.readTree(documentKey); -String objectId = extractJsonNode(jsonNode.get(ID_FIELD), OID_FIELD); +String objectId; +// if user specifies the `_id` field manually, the $oid field may not exist +if (jsonNode.get(ID_FIELD).has(OID_FIELD)) { +objectId = extractJsonNode(jsonNode.get(ID_FIELD), OID_FIELD); +} else { +objectId = jsonNode.get(ID_FIELD).asText(); +} Map row = new HashMap<>(); row.put(ID_FIELD, objectId); return row; - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch master updated: (Fix)[case] Fix unstable cases (#415)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new d4af6308 (Fix)[case] Fix unstable cases (#415) d4af6308 is described below commit d4af630843b4605f93686517d1f3939f300c40d2 Author: wudi <676366...@qq.com> AuthorDate: Tue Jul 2 11:06:25 2024 +0800 (Fix)[case] Fix unstable cases (#415) --- .../doris/flink/sink/copy/BatchStageLoad.java | 5 +++ .../doris/flink/sink/copy/DorisCopyWriter.java | 4 +- .../java/org/apache/doris/flink/sink/TestUtil.java | 42 +++ .../flink/sink/batch/TestDorisBatchStreamLoad.java | 23 ++- .../doris/flink/sink/copy/TestDorisCopyWriter.java | 48 +++--- 5 files changed, 75 insertions(+), 47 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java index 0080dee1..be8adcb0 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java @@ -427,4 +427,9 @@ public class BatchStageLoad implements Serializable { public void setHttpClientBuilder(HttpClientBuilder httpClientBuilder) { this.httpClientBuilder = httpClientBuilder; } + +@VisibleForTesting +public boolean isLoadThreadAlive() { +return loadThreadAlive; +} } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyWriter.java index 469b3f2d..b47f2ebe 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyWriter.java @@ -197,7 +197,7 @@ public class DorisCopyWriter } @VisibleForTesting -public void setBatchStageLoad(BatchStageLoad batchStageLoad) { -this.batchStageLoad = batchStageLoad; +public BatchStageLoad getBatchStageLoad() { +return batchStageLoad; } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestUtil.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestUtil.java new file mode 100644 index ..9858e05e --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestUtil.java @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.util.function.SupplierWithException; + +import java.util.concurrent.TimeoutException; + +public class TestUtil { + +public static void waitUntilCondition( +SupplierWithException condition, +Deadline timeout, +long retryIntervalMillis, +String errorMsg) +throws Exception { +while (timeout.hasTimeLeft() && !(Boolean) condition.get()) { +long timeLeft = Math.max(0L, timeout.timeLeft().toMillis()); +Thread.sleep(Math.min(retryIntervalMillis, timeLeft)); +} + +if (!timeout.hasTimeLeft()) { +throw new TimeoutException(errorMsg); +} +} +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java index d73ff440..6080db2d 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java @@ -18,13 +18,13 @@ package org.apache.doris.flink.sink.batch; import org.apache.flink.api.common.time.Deadline; -import org.apache.flink.util.functio
svn commit: r70083 - /dev/doris/doris-shade/2.1.1/ /release/doris/doris-shade/2.1.1/
Author: diwu Date: Tue Jul 2 02:08:48 2024 New Revision: 70083 Log: move doris shade 2.1.1 to release Added: release/doris/doris-shade/2.1.1/ - copied from r70082, dev/doris/doris-shade/2.1.1/ Removed: dev/doris/doris-shade/2.1.1/ - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-kafka-connector) branch master updated: [feature]Add kafka to doris container test case (#33)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git The following commit(s) were added to refs/heads/master by this push: new 528c901 [feature]Add kafka to doris container test case (#33) 528c901 is described below commit 528c901202067e0b851a626917906337d61d2270 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Mon Jul 1 16:22:47 2024 +0800 [feature]Add kafka to doris container test case (#33) --- .../workflows/kafka2doris-e2ecase.yaml | 33 ++- .licenserc.yaml| 1 + pom.xml| 24 +++ .../connector/e2e/doris/DorisContainerService.java | 29 +++ .../e2e/doris/DorisContainerServiceImpl.java | 188 .../connector/e2e/kafka/KafkaContainerService.java | 37 .../e2e/kafka/KafkaContainerServiceImpl.java | 239 + .../e2e/sink/AbstractKafka2DorisSink.java | 125 +++ .../stringconverter/AbstractStringE2ESinkTest.java | 62 ++ .../e2e/sink/stringconverter/StringMsgE2ETest.java | 95 .../e2e/string_converter/string_msg_connector.json | 21 ++ .../e2e/string_converter/string_msg_tab.sql| 12 ++ 12 files changed, 857 insertions(+), 9 deletions(-) diff --git a/.licenserc.yaml b/.github/workflows/kafka2doris-e2ecase.yaml similarity index 61% copy from .licenserc.yaml copy to .github/workflows/kafka2doris-e2ecase.yaml index ca50638..f30f5c1 100644 --- a/.licenserc.yaml +++ b/.github/workflows/kafka2doris-e2ecase.yaml @@ -15,15 +15,30 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +# +--- +name: E2E Test CI +on: + pull_request: + push: -header: - license: -spdx-id: Apache-2.0 -copyright-owner: Apache Software Foundation +jobs: + build-extension: +name: "e2e test case" +runs-on: ubuntu-latest +defaults: + run: +shell: bash +steps: + - name: Checkout +uses: actions/checkout@master - paths-ignore: -- 'LICENSE' -- '.gitignore' -- 'src/test/resources/decode/avro/**' + - name: Setup java +uses: actions/setup-java@v2 +with: + distribution: adopt + java-version: '8' - comment: on-failure \ No newline at end of file + - name: Doris E2E Test +run: | + mvn test -Dtest='org.apache.doris.kafka.connector.e2e.**' \ No newline at end of file diff --git a/.licenserc.yaml b/.licenserc.yaml index ca50638..07a811b 100644 --- a/.licenserc.yaml +++ b/.licenserc.yaml @@ -25,5 +25,6 @@ header: - 'LICENSE' - '.gitignore' - 'src/test/resources/decode/avro/**' +- 'src/test/resources/e2e/**' comment: on-failure \ No newline at end of file diff --git a/pom.xml b/pom.xml index 29005cb..fd99b7d 100644 --- a/pom.xml +++ b/pom.xml @@ -91,6 +91,7 @@ 4.5.13 2.3 2.2.0 +1.17.6 @@ -294,6 +295,18 @@ + +org.testcontainers +testcontainers +${testcontainers.version} +test + + +org.testcontainers +kafka +${testcontainers.version} +test + @@ -422,6 +435,17 @@ + + +org.apache.maven.plugins +maven-surefire-plugin +2.22.2 + + + **/org/apache/doris/kafka/connector/e2e/** + + + diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerService.java b/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerService.java new file mode 100644 index 000..79ec94d --- /dev/null +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerService.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for
(doris-flink-connector) branch master updated: [improve] support group commit (#412)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 43a055a9 [improve] support group commit (#412) 43a055a9 is described below commit 43a055a9e6b4c728912725976eedbecdfb8b270c Author: wudi <676366...@qq.com> AuthorDate: Mon Jul 1 10:31:15 2024 +0800 [improve] support group commit (#412) --- .../java/org/apache/doris/flink/sink/HttpPutBuilder.java | 4 +++- .../apache/doris/flink/sink/batch/DorisBatchStreamLoad.java | 6 ++ .../org/apache/doris/flink/sink/writer/DorisStreamLoad.java | 12 +++- .../org/apache/doris/flink/sink/writer/LoadConstants.java| 1 + .../apache/doris/flink/sink/copy/TestDorisCopyWriter.java| 3 +++ 5 files changed, 24 insertions(+), 2 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java index 023cd31a..44f6c9fe 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java @@ -111,7 +111,9 @@ public class HttpPutBuilder { } public HttpPutBuilder setLabel(String label) { -header.put("label", label); +if (label != null) { +header.put("label", label); +} return this; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index d9fba749..fbc6daa0 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -67,6 +67,7 @@ import static org.apache.doris.flink.sink.LoadStatus.SUCCESS; import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW; import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; +import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY; @@ -95,6 +96,7 @@ public class DorisBatchStreamLoad implements Serializable { private AtomicReference exception = new AtomicReference<>(null); private HttpClientBuilder httpClientBuilder = new HttpUtil().getHttpClientBuilderForBatch(); private BackendUtil backendUtil; +private boolean enableGroupCommit; public DorisBatchStreamLoad( DorisOptions dorisOptions, @@ -120,6 +122,7 @@ public class DorisBatchStreamLoad implements Serializable { LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)) .getBytes(); } +this.enableGroupCommit = loadProps.containsKey(GROUP_COMMIT); this.executionOptions = executionOptions; this.flushQueue = new LinkedBlockingDeque<>(executionOptions.getFlushQueueSize()); if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) { @@ -260,6 +263,9 @@ public class DorisBatchStreamLoad implements Serializable { /** execute stream load. */ public void load(String label, BatchRecordBuffer buffer) throws IOException { +if (enableGroupCommit) { +label = null; +} refreshLoadUrl(buffer.getDatabase(), buffer.getTable()); ByteBuffer data = buffer.getData(); ByteArrayEntity entity = diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index 14e44dee..676de3df 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -58,6 +58,7 @@ import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN; import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW; import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; +import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY; @@ -87,6
(doris-spark-connector) branch master updated: [Improve] decrease memory usage when csv is on (#212)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new 3e745e7 [Improve] decrease memory usage when csv is on (#212) 3e745e7 is described below commit 3e745e732fdade8a26856bd92026b44fd02d2787 Author: zhaorongsheng AuthorDate: Mon Jul 1 10:20:40 2024 +0800 [Improve] decrease memory usage when csv is on (#212) Co-authored-by: zhaorongsheng --- .../org/apache/doris/spark/load/StreamLoader.scala | 33 +++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala index 9481b6f..06bb56f 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala @@ -20,6 +20,7 @@ package org.apache.doris.spark.load import com.fasterxml.jackson.core.`type`.TypeReference import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.json.JsonMapper +import org.apache.commons.io.IOUtils import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings} @@ -38,7 +39,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.StructType import org.slf4j.{Logger, LoggerFactory} -import java.io.{ByteArrayOutputStream, IOException} +import java.io.{ByteArrayOutputStream, IOException, InputStream} import java.net.{HttpURLConnection, URL} import java.nio.charset.StandardCharsets import java.util @@ -375,14 +376,13 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader if (compressType.nonEmpty) { if ("gz".equalsIgnoreCase(compressType.get) && format == DataFormat.CSV) { -val recordBatchString = new RecordBatchString(RecordBatch.newBuilder(iterator.asJava) +val recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(iterator.asJava) .format(format) .sep(columnSeparator) .delim(lineDelimiter) .schema(schema) .addDoubleQuotes(addDoubleQuotes).build, streamingPassthrough) -val content = recordBatchString.getContent -val compressedData = compressByGZ(content) +val compressedData = compressByGZ(recodeBatchInputStream) entity = Some(new ByteArrayEntity(compressedData)) } else { @@ -457,6 +457,31 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader compressedData } + /** + * compress data by gzip + * + * @param contentInputStream data content + * @throws + * @return compressed byte array data + */ + @throws[IOException] + def compressByGZ(contentInputStream: InputStream): Array[Byte] = { +var compressedData: Array[Byte] = null +try { + val baos = new ByteArrayOutputStream + val gzipOutputStream = new GZIPOutputStream(baos) + try { +IOUtils.copy(contentInputStream, gzipOutputStream) +gzipOutputStream.finish() +compressedData = baos.toByteArray + } finally { +if (baos != null) baos.close() +if (gzipOutputStream != null) gzipOutputStream.close() + } +} +compressedData + } + /** * handle stream load response * - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris) branch master updated (0babde17bf2 -> d42e57f0283)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/doris.git from 0babde17bf2 [chore](Regression) Remove useless get provider code in regression framework (#37000) add d42e57f0283 [regression-test](connector) Add a case for the response of streamload that the connector depends (#36864) No new revisions were added by this update. Summary of changes: .../data/flink_connector_p0/test_response.csv | 2 + .../flink_connector_response.groovy| 186 + 2 files changed, 188 insertions(+) create mode 100644 regression-test/data/flink_connector_p0/test_response.csv create mode 100644 regression-test/suites/flink_connector_p0/flink_connector_response.groovy - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-shade) branch master updated: Upgrade paimon to 0.8.1 (#46)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-shade.git The following commit(s) were added to refs/heads/master by this push: new c84872a Upgrade paimon to 0.8.1 (#46) c84872a is described below commit c84872a09edae7c6c89d1c6a0f0e27439f2f68c4 Author: Calvin Kirs AuthorDate: Thu Jun 27 10:11:58 2024 +0800 Upgrade paimon to 0.8.1 (#46) --- CHANGE-LOG.md | 2 ++ hive-catalog-shade/pom.xml | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGE-LOG.md b/CHANGE-LOG.md index de331b7..57116d0 100644 --- a/CHANGE-LOG.md +++ b/CHANGE-LOG.md @@ -4,6 +4,8 @@ 2.1.0 - Upgrade paimon to 0.8.0 - Upgrade bcpkix-jdkon15 dependency to bcpkix-jdkon18 + 2.1.1 +- Upgrade paimon to 0.8.1 ### 2.0 2.0.0 - upgrade avro to 1.11.3 diff --git a/hive-catalog-shade/pom.xml b/hive-catalog-shade/pom.xml index 53baa3f..6100968 100644 --- a/hive-catalog-shade/pom.xml +++ b/hive-catalog-shade/pom.xml @@ -31,7 +31,7 @@ under the License. 3.3.6 2.8.1 1.4.3 -0.8.0 +0.8.1 1.11.3 2.5.2 1.13.1 - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-spark-connector) branch master updated: [fix](compatible) Fix cast eror when select data from doris 2.0 (#209)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new 1df1c96 [fix](compatible) Fix cast eror when select data from doris 2.0 (#209) 1df1c96 is described below commit 1df1c96b516cf3c5a07b61d0375edfc9eeff6801 Author: Lijia Liu AuthorDate: Wed Jun 26 17:07:59 2024 +0800 [fix](compatible) Fix cast eror when select data from doris 2.0 (#209) --- .../src/main/scala/org/apache/doris/spark/sql/Utils.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala index 8910389..7cffbe5 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.sources._ import org.slf4j.Logger import java.sql.{Date, Timestamp} -import java.time.Duration +import java.time.{Duration, LocalDate} import java.util.concurrent.locks.LockSupport import scala.annotation.tailrec import scala.reflect.ClassTag @@ -106,6 +106,7 @@ private[spark] object Utils { case stringValue: String => s"'${escapeSql(stringValue)}'" case timestampValue: Timestamp => "'" + timestampValue + "'" case dateValue: Date => "'" + dateValue + "'" +case dateValue: LocalDate => "'" + dateValue + "'" case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") case _ => value } - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-kafka-connector) branch master updated: [feature]Implement DorisAvroConverter to support parsing schema from avro avsc file path (#32)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git The following commit(s) were added to refs/heads/master by this push: new e43e59f [feature]Implement DorisAvroConverter to support parsing schema from avro avsc file path (#32) e43e59f is described below commit e43e59f5d42eda6a3844a9b8825a9971bcefe4ad Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Wed Jun 26 16:49:26 2024 +0800 [feature]Implement DorisAvroConverter to support parsing schema from avro avsc file path (#32) --- .github/workflows/license-eyes.yml | 4 + .../workflows/license-eyes.yml => .licenserc.yaml | 31 ++-- .../kafka/connector/decode/DorisConverter.java | 40 + .../kafka/connector/decode/DorisJsonSchema.java| 90 ++ .../connector/decode/avro/DorisAvroConverter.java | 194 + .../connector/exception/DataDecodeException.java | 35 .../decode/avro/DorisAvroConverterTest.java| 96 ++ src/test/resources/decode/avro/product.avsc| 18 ++ src/test/resources/decode/avro/user.avsc | 18 ++ 9 files changed, 507 insertions(+), 19 deletions(-) diff --git a/.github/workflows/license-eyes.yml b/.github/workflows/license-eyes.yml index 02d108a..ba0c303 100644 --- a/.github/workflows/license-eyes.yml +++ b/.github/workflows/license-eyes.yml @@ -23,6 +23,7 @@ on: push: branches: - master + jobs: license-check: name: "License Check" @@ -30,7 +31,10 @@ jobs: steps: - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" uses: actions/checkout@v2 + - name: Check License uses: apache/skywalking-eyes@v0.2.0 +with: + config-path: ./.licenserc.yaml env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/license-eyes.yml b/.licenserc.yaml similarity index 66% copy from .github/workflows/license-eyes.yml copy to .licenserc.yaml index 02d108a..ca50638 100644 --- a/.github/workflows/license-eyes.yml +++ b/.licenserc.yaml @@ -15,22 +15,15 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# -name: License Check -on: - pull_request: - push: -branches: - - master -jobs: - license-check: -name: "License Check" -runs-on: ubuntu-latest -steps: - - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" -uses: actions/checkout@v2 - - name: Check License -uses: apache/skywalking-eyes@v0.2.0 -env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + +header: + license: +spdx-id: Apache-2.0 +copyright-owner: Apache Software Foundation + + paths-ignore: +- 'LICENSE' +- '.gitignore' +- 'src/test/resources/decode/avro/**' + + comment: on-failure \ No newline at end of file diff --git a/src/main/java/org/apache/doris/kafka/connector/decode/DorisConverter.java b/src/main/java/org/apache/doris/kafka/connector/decode/DorisConverter.java new file mode 100644 index 000..6b0e960 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/decode/DorisConverter.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.decode; + +import java.util.Map; +import org.apache.doris.kafka.connector.exception.DataDecodeException; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.storage.Converter; + +public abstract class DorisConverter implements Converter { + +/** unused */ +@Override +public void configure(final Map map, final boolean b) { +// not necessary +} + +/** doesn't support data source connector */ +@Override +public byte[] fromConnectData(String topic, Schema schema, Object value) { +throw new DataDecodeException("DorisConverter doesn't support data source connector yet."); +} +} diff --git a/src/main/java/or
(doris) branch master updated: [Chore](GA)Use github's codeowner to implement maintainer review (#36852)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new d1d87295688 [Chore](GA)Use github's codeowner to implement maintainer review (#36852) d1d87295688 is described below commit d1d872956883d537540317741757908ad9f7e0e7 Author: Calvin Kirs AuthorDate: Wed Jun 26 14:59:19 2024 +0800 [Chore](GA)Use github's codeowner to implement maintainer review (#36852) ## Proposed changes Compared with the previous method, the contributor experience is better through Github's ready-made method FYI https://docs.github.com/en/repositories/managing-your-repositorys-settings-and-features/customizing-your-repository/about-code-owners --- .asf.yaml | 1 + .github/CODEOWNERS | 18 + tools/maintainers/check_review.py | 40 -- tools/maintainers/maintainers.json | 20 --- 4 files changed, 23 insertions(+), 56 deletions(-) diff --git a/.asf.yaml b/.asf.yaml index 61fb7e0346d..83a80db5ff6 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -69,6 +69,7 @@ github: required_pull_request_reviews: dismiss_stale_reviews: true +require_code_owner_reviews: true required_approving_review_count: 1 branch-1.1-lts: required_status_checks: diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 000..15b12fc843e --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +be/src/io/* @platoneko @gavinchou @dataroaring +fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @dataroaring @CalvinKirs @morningman diff --git a/tools/maintainers/check_review.py b/tools/maintainers/check_review.py index 755b6de1c59..728bb8c0a33 100644 --- a/tools/maintainers/check_review.py +++ b/tools/maintainers/check_review.py @@ -20,13 +20,12 @@ import json import match import sys -def check_review_pass(pr_num, maintainers_file, token): +def check_review_pass(pr_num, token): """ Checks if all necessary files have been reviewed by maintainers. Args: pr_num (int): PR number. - maintainers_file (str): Path to maintainers.json. token (str): GitHub token. Returns: @@ -46,50 +45,19 @@ def check_review_pass(pr_num, maintainers_file, token): # Create a list of reviewers who have approved approves = [reviewer for reviewer, status in latest_statuses.items() if status == 'APPROVED'] - - # Get list of changed files - response = requests.get(f"https://api.github.com/repos/apache/doris/pulls/{pr_num}/files;, headers=headers) - file_changes = response.json() - file_change_names = [file['filename'] for file in file_changes] - - # Read maintainers.json - with open(maintainers_file) as f: - data = json.load(f) - need_maintainers_review_path = [item['path'] for item in data['paths']] - - # Check if each path's files have been reviewed by a maintainer - has_maintainer_review = True - for file in file_change_names: - path_found = False - for path_item in data['paths']: - path = path_item['path'] - maintainers = path_item['maintainers'] - - if match.match(file, path): - path_found = True - if maintainers: - if not any(maintainer in approves for maintainer in maintainers): - has_maintainer_review = False - break - else: - continue - - if not path_found: - continue print(approves) if len(approves) < 2: print("PR has not been approved by at least 2 reviewers") exit(1) - - return has_maintainer_review + else: + return True if __name__ == "__main__": pr_num = sys.argv[1] token = sys.argv[2] - maintainers_file = 'tools/maintainers/maintainers.json' # Adjust path if needed - if check_review_pass(pr_num, maintainers_file, token): + if check_review_pass(pr_n
(doris-flink-connector) branch master updated: [improve] modify type judgement statements (#405)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 8722c3e8 [improve] modify type judgement statements (#405) 8722c3e8 is described below commit 8722c3e8f543d2fc059622dc432c8cd6db7083c4 Author: xiayang AuthorDate: Wed Jun 26 14:20:26 2024 +0800 [improve] modify type judgement statements (#405) --- .../java/org/apache/doris/flink/serialization/RowBatch.java | 11 --- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java index 81fe69e1..c7afe7f5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java @@ -18,7 +18,6 @@ package org.apache.doris.flink.serialization; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.util.Preconditions; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BaseIntVector; @@ -177,8 +176,14 @@ public class RowBatch { final String currentType = schema.get(col).getType(); for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { boolean passed = doConvert(col, rowIndex, minorType, currentType, fieldVector); -Preconditions.checkArgument( -passed, typeMismatchMessage(currentType, minorType)); +if (!passed) { +throw new java.lang.IllegalArgumentException( +"FLINK type is " ++ currentType ++ ", but arrow type is " ++ minorType.name() ++ "."); +} } } } catch (Exception e) { - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch master updated: [improve] support modify column type/comment, support auto create database if not exists (#408)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 35771549 [improve] support modify column type/comment, support auto create database if not exists (#408) 35771549 is described below commit 35771549180bed4981689071de000fd6e2b9e080 Author: North Lin <37775475+qg-...@users.noreply.github.com> AuthorDate: Wed Jun 26 14:19:51 2024 +0800 [improve] support modify column type/comment, support auto create database if not exists (#408) --- .../flink/sink/schema/SchemaChangeHelper.java | 67 ++-- .../flink/sink/schema/SchemaChangeManager.java | 75 - .../flink/sink/schema/SchemaManagerITCase.java | 121 + 3 files changed, 230 insertions(+), 33 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java index 4c29c348..8d365ffc 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java @@ -29,6 +29,8 @@ import java.util.Map; import java.util.Map.Entry; public class SchemaChangeHelper { +public static final String DEFAULT_DATABASE = "information_schema"; + private static final List dropFieldSchemas = Lists.newArrayList(); private static final List addFieldSchemas = Lists.newArrayList(); // Used to determine whether the doris table supports ddl @@ -38,6 +40,11 @@ public class SchemaChangeHelper { private static final String RENAME_DDL = "ALTER TABLE %s RENAME COLUMN %s %s"; private static final String CHECK_COLUMN_EXISTS = "SELECT COLUMN_NAME FROM information_schema.`COLUMNS` WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND COLUMN_NAME = '%s'"; +private static final String CHECK_DATABASE_EXISTS = +"SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA` WHERE SCHEMA_NAME = '%s'"; +private static final String CREATE_DATABASE_DDL = "CREATE DATABASE IF NOT EXISTS %s"; +private static final String MODIFY_TYPE_DDL = "ALTER TABLE %s MODIFY COLUMN %s %s"; +private static final String MODIFY_COMMENT_DDL = "ALTER TABLE %s MODIFY COLUMN %s COMMENT '%s'"; public static void compareSchema( Map updateFiledSchemaMap, @@ -104,19 +111,18 @@ public class SchemaChangeHelper { String type = fieldSchema.getTypeString(); String defaultValue = fieldSchema.getDefaultValue(); String comment = fieldSchema.getComment(); -String addDDL = -String.format( -ADD_DDL, -DorisSystem.quoteTableIdentifier(tableIdentifier), -DorisSystem.identifier(name), -type); +StringBuilder addDDL = +new StringBuilder( +String.format( +ADD_DDL, + DorisSystem.quoteTableIdentifier(tableIdentifier), +DorisSystem.identifier(name), +type)); if (defaultValue != null) { -addDDL = addDDL + " DEFAULT " + DorisSystem.quoteDefaultValue(defaultValue); -} -if (!StringUtils.isNullOrWhitespaceOnly(comment)) { -addDDL = addDDL + " COMMENT '" + DorisSystem.quoteComment(comment) + "'"; +addDDL.append(" DEFAULT ").append(DorisSystem.quoteDefaultValue(defaultValue)); } -return addDDL; +commentColumn(addDDL, comment); +return addDDL.toString(); } public static String buildDropColumnDDL(String tableIdentifier, String columName) { @@ -139,6 +145,45 @@ public class SchemaChangeHelper { return String.format(CHECK_COLUMN_EXISTS, database, table, column); } +public static String buildDatabaseExistsQuery(String database) { +return String.format(CHECK_DATABASE_EXISTS, database); +} + +public static String buildCreateDatabaseDDL(String database) { +return String.format(CREATE_DATABASE_DDL, database); +} + +public static String buildModifyColumnCommentDDL( +String tableIdentifier, String columnName, String newComment) { +return String.format( +MODIFY_COMMENT_DDL, +DorisSystem.quoteTableIdentifier(tableIdentifier), +DorisSystem.identifier(columnName), +DorisSystem.quoteComment(newComment)); +} + +public static String buil
(doris-flink-connector) branch master updated: [improve](testcase) add unit test for flink type (#409)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 9ea56b9f [improve](testcase) add unit test for flink type (#409) 9ea56b9f is described below commit 9ea56b9f5075160ac0072d0aa7f0d9c8c2d4542e Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Wed Jun 26 14:19:00 2024 +0800 [improve](testcase) add unit test for flink type (#409) --- .../doris/flink/catalog/DorisTypeMapper.java | 28 +++ .../doris/flink/catalog/DorisTypeMapperTest.java | 42 -- 2 files changed, 68 insertions(+), 2 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java index ba612256..a8e3d7d6 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java @@ -29,14 +29,18 @@ import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.DoubleType; import org.apache.flink.table.types.logical.FloatType; import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.table.types.logical.TinyIntType; import org.apache.flink.table.types.logical.VarBinaryType; import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.ZonedTimestampType; import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor; import org.apache.doris.flink.catalog.doris.DorisType; @@ -226,6 +230,25 @@ public class DorisTypeMapper { "%s(%s)", DorisType.DATETIME_V2, Math.min(Math.max(precision, 0), 6)); } +@Override +public String visit(ZonedTimestampType timestampType) { +int precision = timestampType.getPrecision(); +return String.format( +"%s(%s)", DorisType.DATETIME_V2, Math.min(Math.max(precision, 0), 6)); +} + +@Override +public String visit(LocalZonedTimestampType localZonedTimestampType) { +int precision = localZonedTimestampType.getPrecision(); +return String.format( +"%s(%s)", DorisType.DATETIME_V2, Math.min(Math.max(precision, 0), 6)); +} + +@Override +public String visit(TimeType timeType) { +return STRING; +} + @Override public String visit(ArrayType arrayType) { return STRING; @@ -241,6 +264,11 @@ public class DorisTypeMapper { return STRING; } +@Override +public String visit(MultisetType multisetType) { +return STRING; +} + @Override public String visit(BinaryType binaryType) { return STRING; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisTypeMapperTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisTypeMapperTest.java index 2be07913..84cd367c 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisTypeMapperTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisTypeMapperTest.java @@ -76,12 +76,16 @@ public class DorisTypeMapperTest { public void testDecimalType() { DataType result = DorisTypeMapper.toFlinkType("col", "DECIMAL", 10, 2); assertEquals(DataTypes.DECIMAL(10, 2), result); +String dorisType = DorisTypeMapper.toDorisType(DataTypes.DECIMAL(10, 2)); +assertEquals("DECIMALV3(10,2)", dorisType); } @Test public void testDecimalV3Type() { DataType result = DorisTypeMapper.toFlinkType("col", "DECIMALV3", 10, 2); assertEquals(DataTypes.DECIMAL(10, 2), result); +String dorisType = DorisTypeMapper.toDorisType(DataTypes.DECIMAL(10, 2)); +assertEquals("DECIMALV3(10,2)", dorisType); } @Test @@ -136,8 +140,42 @@ public class DorisTypeMapperTest { public void testDatetimeType() { DataType result = DorisTypeMapper.toFlinkType("col&
(doris-kafka-connector) branch master updated: [fix]Fix npe exception caused by closing jmx (#31)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git The following commit(s) were added to refs/heads/master by this push: new 01a6c7d [fix]Fix npe exception caused by closing jmx (#31) 01a6c7d is described below commit 01a6c7d96581ed1adb08d684b1fbb9e861a2bd7b Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Fri Jun 21 17:58:35 2024 +0800 [fix]Fix npe exception caused by closing jmx (#31) --- .../connector/metrics/DorisConnectMonitor.java | 34 -- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java b/src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java index e5d1bd4..c3b28a2 100644 --- a/src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java +++ b/src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java @@ -46,6 +46,7 @@ public class DorisConnectMonitor { private Histogram partitionBufferCountHistogram; private final AtomicLong buffMemoryUsage; private final int taskId; +private final boolean enableCustomJMX; public DorisConnectMonitor( final boolean enableCustomJMXConfig, @@ -59,7 +60,8 @@ public class DorisConnectMonitor { this.buffMemoryUsage = new AtomicLong(0); this.taskId = taskId; -if (enableCustomJMXConfig) { +this.enableCustomJMX = enableCustomJMXConfig; +if (this.enableCustomJMX) { registerJMXMetrics(metricsJmxReporter); LOG.info("init DorisConnectMonitor, taskId={}", taskId); } @@ -134,31 +136,45 @@ public class DorisConnectMonitor { } public void setCommittedOffset(long committedOffset) { -this.committedOffset.set(committedOffset); +if (enableCustomJMX) { +this.committedOffset.set(committedOffset); +} } public void addAndGetLoadCount() { -this.totalLoadCount.getAndIncrement(); +if (enableCustomJMX) { +this.totalLoadCount.getAndIncrement(); +} } public void addAndGetTotalNumberOfRecord(long totalNumberOfRecord) { -this.totalNumberOfRecord.addAndGet(totalNumberOfRecord); +if (enableCustomJMX) { +this.totalNumberOfRecord.addAndGet(totalNumberOfRecord); +} } public void addAndGetTotalSizeOfData(long totalSizeOfData) { -this.totalSizeOfData.addAndGet(totalSizeOfData); +if (enableCustomJMX) { +this.totalSizeOfData.addAndGet(totalSizeOfData); +} } public void addAndGetBuffMemoryUsage(long memoryUsage) { -this.buffMemoryUsage.addAndGet(memoryUsage); +if (enableCustomJMX) { +this.buffMemoryUsage.addAndGet(memoryUsage); +} } public void resetMemoryUsage() { -this.buffMemoryUsage.set(0L); +if (enableCustomJMX) { +this.buffMemoryUsage.set(0L); +} } public void updateBufferMetrics(long bufferSizeBytes, int numOfRecords) { -partitionBufferSizeBytesHistogram.update(bufferSizeBytes); -partitionBufferCountHistogram.update(numOfRecords); +if (enableCustomJMX) { +partitionBufferSizeBytesHistogram.update(bufferSizeBytes); +partitionBufferCountHistogram.update(numOfRecords); +} } } - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-kafka-connector) branch master updated: [Improve]Optimize and delete some redundant code (#30)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git The following commit(s) were added to refs/heads/master by this push: new b18df3e [Improve]Optimize and delete some redundant code (#30) b18df3e is described below commit b18df3e76932a8ab7bdda1a4e746727480cd64f5 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Fri Jun 21 17:56:10 2024 +0800 [Improve]Optimize and delete some redundant code (#30) --- .../doris/kafka/connector/DorisSinkConnector.java | 1 - .../doris/kafka/connector/DorisSinkTask.java | 4 +- .../doris/kafka/connector/cfg/DorisOptions.java| 54 -- .../kafka/connector/converter/RecordService.java | 5 -- .../doris/kafka/connector/utils/BackendUtils.java | 25 -- .../kafka/connector/utils/ConfigCheckUtils.java| 10 .../doris/kafka/connector/utils/FileNameUtils.java | 11 + .../kafka/connector/writer/CopyIntoWriter.java | 2 +- .../doris/kafka/connector/writer/DorisWriter.java | 1 - .../kafka/connector/writer/LoadConstants.java | 1 - .../connector/converter/TestRecordService.java | 7 --- 11 files changed, 4 insertions(+), 117 deletions(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java b/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java index bd1fe20..0bf90bd 100644 --- a/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java +++ b/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java @@ -38,7 +38,6 @@ public class DorisSinkConnector extends SinkConnector { private static final Logger LOG = LoggerFactory.getLogger(DorisSinkConnector.class); private Map config; -private String connectorName; /** No-Arg constructor. Required by Kafka Connect framework */ public DorisSinkConnector() {} diff --git a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java index 56faf7e..00c034c 100644 --- a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java +++ b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java @@ -119,12 +119,12 @@ public class DorisSinkTask extends SinkTask { @Override public Map preCommit( Map offsets) throws RetriableException { - -sink.commit(offsets); // return an empty map means that offset commitment is not desired if (sink == null || sink.getPartitionCount() == 0) { return new HashMap<>(); } + +sink.commit(offsets); Map committedOffsets = new HashMap<>(); // it's ok to just log the error since commit can retry try { diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java index d5eaaf2..69cbb80 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java @@ -299,58 +299,4 @@ public class DorisOptions { } return new HashMap<>(); } - -@Override -public String toString() { -return "DorisOptions{" -+ "name='" -+ name -+ '\'' -+ ", urls='" -+ urls -+ '\'' -+ ", queryPort=" -+ queryPort -+ ", httpPort=" -+ httpPort -+ ", user='" -+ user -+ '\'' -+ ", password='" -+ password -+ '\'' -+ ", database='" -+ database -+ '\'' -+ ", topicMap=" -+ topicMap -+ ", fileSize=" -+ fileSize -+ ", recordNum=" -+ recordNum -+ ", flushTime=" -+ flushTime -+ ", enableCustomJMX=" -+ enableCustomJMX -+ ", taskId=" -+ taskId -+ ", enableDelete=" -+ enableDelete -+ ", autoRedirect=" -+ autoRedirect -+ ", requestReadTimeoutMs=" -+ requestReadTimeoutMs -+ ", requestConnectTimeoutMs=" -+ requestConnectTimeoutMs -+ ", streamLoadProp=" -+ streamLoadProp -+ ", labelPrefix='" -+ labelPrefix -+ '\'' -
(doris-flink-connector) branch master updated: [fix] Fix garbled of table or column comments contain Chinese characters(#401) (#403)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 0acfbf2d [fix] Fix garbled of table or column comments contain Chinese characters(#401) (#403) 0acfbf2d is described below commit 0acfbf2d1fb76208178bef391f65ede02a1380a5 Author: North Lin <37775475+qg-...@users.noreply.github.com> AuthorDate: Tue Jun 18 10:28:00 2024 +0800 [fix] Fix garbled of table or column comments contain Chinese characters(#401) (#403) --- .../flink/sink/schema/SchemaChangeManager.java | 16 -- .../flink/sink/schema/SchemaManagerITCase.java | 57 ++ .../doris/flink/sink/schema/SchemaManagerTest.java | 6 +++ 3 files changed, 76 insertions(+), 3 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java index 2aca3c7b..d2bacf26 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java @@ -56,11 +56,17 @@ public class SchemaChangeManager implements Serializable { private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s;; private ObjectMapper objectMapper = new ObjectMapper(); private DorisOptions dorisOptions; +private String charsetEncoding = "UTF-8"; public SchemaChangeManager(DorisOptions dorisOptions) { this.dorisOptions = dorisOptions; } +public SchemaChangeManager(DorisOptions dorisOptions, String charsetEncoding) { +this.dorisOptions = dorisOptions; +this.charsetEncoding = charsetEncoding; +} + public boolean createTable(TableSchema table) throws IOException, IllegalArgumentException { String createTableDDL = DorisSystem.buildCreateTableDDL(table); return execute(createTableDDL, table.getDatabase()); @@ -133,7 +139,8 @@ public class SchemaChangeManager implements Serializable { table); HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl); httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); -httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(params))); +httpGet.setEntity( +new StringEntity(objectMapper.writeValueAsString(params), charsetEncoding)); String responseEntity = ""; Map responseMap = handleResponse(httpGet, responseEntity); return handleSchemaChange(responseMap, responseEntity); @@ -173,8 +180,11 @@ public class SchemaChangeManager implements Serializable { database); HttpPost httpPost = new HttpPost(requestUrl); httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); -httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json"); -httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(param))); +httpPost.setHeader( +HttpHeaders.CONTENT_TYPE, +String.format("application/json;charset=%s", charsetEncoding)); +httpPost.setEntity( +new StringEntity(objectMapper.writeValueAsString(param), charsetEncoding)); return httpPost; } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java index 053cf65c..8d2a9b0d 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java @@ -28,8 +28,11 @@ import org.junit.Test; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; public class SchemaManagerITCase extends DorisTestBase { @@ -82,6 +85,60 @@ public class SchemaManagerITCase extends DorisTestBase { Assert.assertTrue(exists); } +@Test +public void testAddColumnWithChineseComment() +throws SQLException, IOException, IllegalArgumentException { +String addColumnTbls = "add_column"; +initDorisSchemaChangeTable(addColumnTbls); + +// add a column by UTF-8 encoding +String addColumnName = "col_with_comment1"; +String chineseComment = "中文注释1"; +addColumnWithChineseCommentAndAssert(addColumnTbls, addColumnName, chineseComm
(doris) branch master updated (6ce368783c5 -> c5aad8dcdbc)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/doris.git from 6ce368783c5 [fix](mtmv)mtmv support default key (#36221) add c5aad8dcdbc [regression-test](flink-connector) add flink connector data type case (#35219) No new revisions were added by this update. Summary of changes: .../flink_connector_p0/flink_connector_type.out| 5 + regression-test/framework/pom.xml | 31 .../flink_connector_p0/flink_connector_type.groovy | 188 + 3 files changed, 224 insertions(+) create mode 100644 regression-test/data/flink_connector_p0/flink_connector_type.out create mode 100644 regression-test/suites/flink_connector_p0/flink_connector_type.groovy - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-website) branch master updated: [Doc](connector) update version (#718)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-website.git The following commit(s) were added to refs/heads/master by this push: new 68a888e9139 [Doc](connector) update version (#718) 68a888e9139 is described below commit 68a888e913925eac50a6b23ea4c03a09e7d1f086 Author: wudi <676366...@qq.com> AuthorDate: Wed Jun 12 14:45:50 2024 +0800 [Doc](connector) update version (#718) --- docs/ecosystem/flink-doris-connector.md | 1 + docs/ecosystem/spark-doris-connector.md | 2 +- .../current/ecosystem/flink-doris-connector.md | 2 +- .../version-2.0/ecosystem/flink-doris-connector.md | 12 +++- .../version-2.0/ecosystem/spark-doris-connector.md | 7 --- .../version-2.1/ecosystem/flink-doris-connector.md | 2 +- .../version-2.1/ecosystem/spark-doris-connector.md | 2 +- .../version-2.0/ecosystem/flink-doris-connector.md | 12 +++- .../version-2.0/ecosystem/spark-doris-connector.md | 7 --- .../version-2.1/ecosystem/flink-doris-connector.md | 2 +- .../version-2.1/ecosystem/spark-doris-connector.md | 2 +- 11 files changed, 29 insertions(+), 22 deletions(-) diff --git a/docs/ecosystem/flink-doris-connector.md b/docs/ecosystem/flink-doris-connector.md index c9a2576d277..02abce7561a 100644 --- a/docs/ecosystem/flink-doris-connector.md +++ b/docs/ecosystem/flink-doris-connector.md @@ -45,6 +45,7 @@ under the License. | 1.3.0 | 1.16| 1.0+ | 8| - | | 1.4.0 | 1.15,1.16,1.17 | 1.0+ | 8 |- | | 1.5.2 | 1.15,1.16,1.17,1.18 | 1.0+ | 8 |- | +| 1.6.1 | 1.15,1.16,1.17,1.18,1.19 | 1.0+ | 8 | - | ## USE diff --git a/docs/ecosystem/spark-doris-connector.md b/docs/ecosystem/spark-doris-connector.md index 1284705471c..3d0058b68a8 100644 --- a/docs/ecosystem/spark-doris-connector.md +++ b/docs/ecosystem/spark-doris-connector.md @@ -39,7 +39,7 @@ Github: https://github.com/apache/doris-spark-connector | Connector | Spark | Doris | Java | Scala | |---|-|-|--|| -| 1.3.1 | 3.4 ~ 3.1, 2.4, 2.3 | 1.0 + | 8| 2.12, 2.11 | +| 1.3.2 | 3.4 ~ 3.1, 2.4, 2.3 | 1.0 + | 8| 2.12, 2.11 | | 1.2.0 | 3.2, 3.1, 2.3 | 1.0 + | 8| 2.12, 2.11 | | 1.1.0 | 3.2, 3.1, 2.3 | 1.0 + | 8| 2.12, 2.11 | | 1.0.1 | 3.1, 2.3| 0.12 - 0.15 | 8| 2.12, 2.11 | diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/flink-doris-connector.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/flink-doris-connector.md index 58022ddbdcc..aebb3227c7e 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/flink-doris-connector.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/flink-doris-connector.md @@ -47,7 +47,7 @@ under the License. | 1.3.0 | 1.16| 1.0+ | 8| - | | 1.4.0 | 1.15,1.16,1.17 | 1.0+ | 8 |- | | 1.5.2 | 1.15,1.16,1.17,1.18 | 1.0+ | 8 |- | -| 1.6.0 | 1.15,1.16,1.17,1.18,1.19 | 1.0+ | 8 |- | +| 1.6.1 | 1.15,1.16,1.17,1.18,1.19 | 1.0+ | 8 |- | ## 使用 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.0/ecosystem/flink-doris-connector.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.0/ecosystem/flink-doris-connector.md index 20f69ed689a..60a677f58ec 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.0/ecosystem/flink-doris-connector.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.0/ecosystem/flink-doris-connector.md @@ -39,11 +39,13 @@ under the License. | Connector Version | Flink Version | Doris Version | Java Version | Scala Version | | - | - | -- | | - | -| 1.0.3 | 1.11+ | 0.15+ | 8| 2.11,2.12 | -| 1.1.1 | 1.14 | 1.0+ | 8| 2.11,2.12 | -| 1.2.1 | 1.15 | 1.0+ | 8| - | -| 1.3.0 | 1.16 | 1.0+ | 8| - | -| 1.4.0 | 1.15,1.16,1.17 | 1.0+ | 8 |- | +| 1.0.3 | 1.11,1.12,1.13,1.14 | 0.15+ | 8| 2.11,2.12 | +| 1.1.1 | 1.14| 1.0+ | 8| 2.11,2.12 | +| 1.2.1 | 1.15| 1.0+ | 8| - | +| 1.3.0 | 1.16| 1.0+ | 8| - | +| 1.4.0 | 1.15,1.16,1.17 | 1.0+ | 8 |- | +| 1.5.2 | 1.15,1.16,1.17,1.18 | 1.0+ | 8 |- | +| 1.6.1 | 1.15,1.16,1.17,1.18,1.19 | 1.0+ | 8 | - | ## 使用 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.0/ecosystem/spark-doris-connector.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.0/eco
(doris-kafka-connector) branch master updated: [Improve]Optimize parameter configuration and ignore parameter case (#28)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git The following commit(s) were added to refs/heads/master by this push: new a2c823c [Improve]Optimize parameter configuration and ignore parameter case (#28) a2c823c is described below commit a2c823cbdcf77f92413166cbe6de4f237f83a657 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Wed Jun 12 09:56:14 2024 +0800 [Improve]Optimize parameter configuration and ignore parameter case (#28) --- .../doris/kafka/connector/DorisSinkConnector.java | 3 +- .../doris/kafka/connector/cfg/DorisOptions.java| 65 +- .../connector/cfg/DorisSinkConnectorConfig.java| 38 +-- .../kafka/connector/converter/ConverterMode.java | 4 ++ .../converter/schema/SchemaEvolutionMode.java | 4 ++ .../kafka/connector/utils/ConfigCheckUtils.java| 49 ++ .../kafka/connector/writer/DeliveryGuarantee.java | 4 ++ .../kafka/connector/writer/load/LoadModel.java | 4 ++ .../kafka/connector/cfg/TestDorisOptions.java | 1 + .../cfg/TestDorisSinkConnectorConfig.java | 77 ++ .../connector/converter/TestRecordService.java | 4 +- .../kafka/connector/writer/TestCopyIntoWriter.java | 2 + .../doris/kafka/connector/writer/TestCopyLoad.java | 2 + .../connector/writer/TestStreamLoadWriter.java | 2 + 14 files changed, 202 insertions(+), 57 deletions(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java b/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java index f6cf64b..bd1fe20 100644 --- a/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java +++ b/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java @@ -52,9 +52,8 @@ public class DorisSinkConnector extends SinkConnector { @Override public void start(final Map parsedConfig) { LOG.info("doris sink connector start"); -config = new HashMap<>(parsedConfig); +config = DorisSinkConnectorConfig.convertToLowercase(parsedConfig); DorisSinkConnectorConfig.setDefaultValues(config); - ConfigCheckUtils.validateConfig(config); } diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java index 3db78ca..d5eaaf2 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java @@ -47,23 +47,23 @@ public class DorisOptions { private final Map topicMap; private final int fileSize; private final int recordNum; -private long flushTime; -private boolean enableCustomJMX; +private final long flushTime; +private final boolean enableCustomJMX; private final int taskId; private final boolean enableDelete; -private boolean enable2PC; +private final boolean enable2PC; private boolean autoRedirect = true; private int requestReadTimeoutMs; private int requestConnectTimeoutMs; /** Properties for the StreamLoad. */ private final Properties streamLoadProp = new Properties(); -private String labelPrefix; -private String databaseTimeZone; -private LoadModel loadModel; -private DeliveryGuarantee deliveryGuarantee; -private ConverterMode converterMode; -private SchemaEvolutionMode schemaEvolutionMode; +@Deprecated private String labelPrefix; +private final String databaseTimeZone; +private final LoadModel loadModel; +private final DeliveryGuarantee deliveryGuarantee; +private final ConverterMode converterMode; +private final SchemaEvolutionMode schemaEvolutionMode; public DorisOptions(Map config) { this.name = config.get(DorisSinkConnectorConfig.NAME); @@ -74,54 +74,25 @@ public class DorisOptions { this.password = config.get(DorisSinkConnectorConfig.DORIS_PASSWORD); this.database = config.get(DorisSinkConnectorConfig.DORIS_DATABASE); this.taskId = Integer.parseInt(config.get(ConfigCheckUtils.TASK_ID)); -this.databaseTimeZone = DorisSinkConnectorConfig.DATABASE_TIME_ZONE_DEFAULT; -if (config.containsKey(DorisSinkConnectorConfig.DATABASE_TIME_ZONE)) { -this.databaseTimeZone = config.get(DorisSinkConnectorConfig.DATABASE_TIME_ZONE); -} -this.loadModel = -LoadModel.of( -config.getOrDefault( -DorisSinkConnectorConfig.LOAD_MODEL, -DorisSinkConnectorConfig.LOAD_MODEL_DEFAULT)); +this.databaseTimeZone = config.get(DorisSinkConnectorConfig.DATABASE_TIME_ZONE); +this.loadModel = LoadModel.of(config.get(DorisSinkConn
(doris-flink-connector) branch master updated: [fix](cdc)fix excluding pattern not working (#390)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new fb254846 [fix](cdc)fix excluding pattern not working (#390) fb254846 is described below commit fb25484677481af5a0d18b48f900122f883f614e Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Fri Jun 7 11:15:00 2024 +0800 [fix](cdc)fix excluding pattern not working (#390) --- .../apache/doris/flink/tools/cdc/DatabaseSync.java | 2 +- .../flink/tools/cdc/mysql/MysqlDatabaseSync.java | 3 +-- .../doris/flink/tools/cdc/DatabaseSyncTest.java| 28 +++--- 3 files changed, 27 insertions(+), 6 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index a1f511a8..691eaafa 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -376,7 +376,7 @@ public abstract class DatabaseSync { } else { String excludingPattern = String.format("?!(%s\\.(%s))$", getTableListPrefix(), excludingTables); -return String.format("(%s)(%s)", includingPattern, excludingPattern); +return String.format("(%s)(%s)", excludingPattern, includingPattern); } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java index 03d3d076..63522472 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java @@ -230,8 +230,7 @@ public class MysqlDatabaseSync extends DatabaseSync { @Override public String getTableListPrefix() { -String databaseName = config.get(MySqlSourceOptions.DATABASE_NAME); -return databaseName; +return config.get(MySqlSourceOptions.DATABASE_NAME); } /** diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java index 1e69c598..f0cd0a51 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java @@ -33,6 +33,8 @@ import java.util.Map; import java.util.Set; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; /** Unit tests for the {@link DatabaseSync}. */ public class DatabaseSyncTest { @@ -69,7 +71,7 @@ public class DatabaseSyncTest { public void getTableBucketsTest() throws SQLException { String tableBuckets = "tbl1:10,tbl2 : 20, a.* :30,b.*:40,.*:50"; DatabaseSync databaseSync = new MysqlDatabaseSync(); -Map tableBucketsMap = databaseSync.getTableBuckets(tableBuckets); +Map tableBucketsMap = DatabaseSync.getTableBuckets(tableBuckets); assertEquals(10, tableBucketsMap.get("tbl1").intValue()); assertEquals(20, tableBucketsMap.get("tbl2").intValue()); assertEquals(30, tableBucketsMap.get("a.*").intValue()); @@ -81,7 +83,7 @@ public class DatabaseSyncTest { public void setTableSchemaBucketsTest() throws SQLException { DatabaseSync databaseSync = new MysqlDatabaseSync(); String tableSchemaBuckets = "tbl1:10,tbl2:20,a11.*:30,a1.*:40,b.*:50,b1.*:60,.*:70"; -Map tableBucketsMap = databaseSync.getTableBuckets(tableSchemaBuckets); +Map tableBucketsMap = DatabaseSync.getTableBuckets(tableSchemaBuckets); List tableList = Arrays.asList( "tbl1", "tbl2", "tbl3", "a11", "a111", "a12", "a13", "b1", "b11", "b2", @@ -103,7 +105,7 @@ public class DatabaseSyncTest { public void setTableSchemaBucketsTest1() throws SQLException { DatabaseSync databaseSync = new MysqlDatabaseSync(); String tableSchemaBuckets = ".*:10,a.*:20,tbl:30,b.*:40"; -Map tableBucketsMap = databaseSync.getTableBuckets(tableSchemaBuckets); +Map tableBucketsMap = DatabaseSync.getTableBuckets(tableSchemaBuckets); List tableList = Ar
(doris-flink-connector) branch master updated: [fix] Support converting type BINARY of Flink to Doris type (#397) (#398)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new a1cadf40 [fix] Support converting type BINARY of Flink to Doris type (#397) (#398) a1cadf40 is described below commit a1cadf407b45e50f9100a0df7b739f4d758f050d Author: North Lin <37775475+qg-...@users.noreply.github.com> AuthorDate: Fri Jun 7 11:14:21 2024 +0800 [fix] Support converting type BINARY of Flink to Doris type (#397) (#398) --- .../main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java | 6 ++ .../java/org/apache/doris/flink/catalog/DorisTypeMapperTest.java| 6 ++ 2 files changed, 12 insertions(+) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java index 55ab7b73..ba612256 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java @@ -21,6 +21,7 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BinaryType; import org.apache.flink.table.types.logical.BooleanType; import org.apache.flink.table.types.logical.CharType; import org.apache.flink.table.types.logical.DateType; @@ -240,6 +241,11 @@ public class DorisTypeMapper { return STRING; } +@Override +public String visit(BinaryType binaryType) { +return STRING; +} + @Override protected String defaultMethod(LogicalType logicalType) { throw new UnsupportedOperationException( diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisTypeMapperTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisTypeMapperTest.java index 5d102f9e..2be07913 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisTypeMapperTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisTypeMapperTest.java @@ -176,4 +176,10 @@ public class DorisTypeMapperTest { DataTypes.ROW(DataTypes.FIELD("field", DataTypes.INT(; assertEquals("STRING", dorisType); } + +@Test +public void testBinaryType() { +String dorisType = DorisTypeMapper.toDorisType(DataTypes.BINARY(1)); +assertEquals("STRING", dorisType); +} } - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch master updated: [Improve] Fix Httpclient connection in Stream Writer (#399)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new c4339a82 [Improve] Fix Httpclient connection in Stream Writer (#399) c4339a82 is described below commit c4339a829411fac792a4a9c6863a14ac0d93a5c9 Author: wudi <676366...@qq.com> AuthorDate: Fri Jun 7 11:13:25 2024 +0800 [Improve] Fix Httpclient connection in Stream Writer (#399) --- .../src/main/java/org/apache/doris/flink/sink/HttpUtil.java | 12 +++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java index 1307ce40..518eea71 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java @@ -18,6 +18,7 @@ package org.apache.doris.flink.sink; import org.apache.http.client.config.RequestConfig; +import org.apache.http.impl.NoConnectionReuseStrategy; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.DefaultRedirectStrategy; import org.apache.http.impl.client.HttpClientBuilder; @@ -27,6 +28,13 @@ import java.util.concurrent.TimeUnit; /** util to build http client. */ public class HttpUtil { + +private RequestConfig requestConfigStream = +RequestConfig.custom() +.setConnectTimeout(60 * 1000) +.setConnectionRequestTimeout(60 * 1000) +.build(); + private final HttpClientBuilder httpClientBuilder = HttpClients.custom() .setRedirectStrategy( @@ -36,8 +44,10 @@ public class HttpUtil { return true; } }) + .setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE) .evictExpiredConnections() -.evictIdleConnections(60, TimeUnit.SECONDS); +.evictIdleConnections(60, TimeUnit.SECONDS) +.setDefaultRequestConfig(requestConfigStream); public CloseableHttpClient getHttpClient() { return httpClientBuilder.build(); - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-kafka-connector) branch master updated: [feature] Support importing MySQL geographic data types and PostgreSQL array data types (#25)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git The following commit(s) were added to refs/heads/master by this push: new f5040b7 [feature] Support importing MySQL geographic data types and PostgreSQL array data types (#25) f5040b7 is described below commit f5040b77f001cfc12ad9aa82235454df8852 Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Thu Jun 6 16:06:03 2024 +0800 [feature] Support importing MySQL geographic data types and PostgreSQL array data types (#25) --- pom.xml| 12 +++ .../kafka/connector/converter/RecordService.java | 2 +- .../connector/converter/RecordTypeRegister.java| 8 ++ .../converter/type/AbstractGeometryType.java | 30 ++ .../doris/kafka/connector/converter/type/Type.java | 4 + .../converter/type/debezium/ArrayType.java | 107 + .../converter/type/debezium/GeographyType.java | 32 ++ .../converter/type/debezium/GeometryType.java | 47 + .../converter/type/debezium/PointType.java | 46 + .../connector/converter/type/util/GeoUtils.java| 67 + .../connector/converter/TestRecordService.java | 62 +++- 11 files changed, 414 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index ddbeb2f..29005cb 100644 --- a/pom.xml +++ b/pom.xml @@ -90,6 +90,7 @@ 2.13.2.1 4.5.13 2.3 +2.2.0 @@ -282,6 +283,17 @@ debezium-core ${debezium.version} + +com.esri.geometry +esri-geometry-api +${geometry.version} + + +com.fasterxml.jackson.core +jackson-core + + + diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java index cd76c42..7c75139 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java @@ -278,7 +278,7 @@ public class RecordService { field.getSchema().isOptional() ? source.getWithoutDefault(fieldName) : source.get(fieldName); -Object convertValue = type.getValue(value); +Object convertValue = type.getValue(value, field.getSchema()); if (Objects.nonNull(convertValue) && !type.isNumber()) { filedMapping.put(fieldName, convertValue.toString()); } else { diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/RecordTypeRegister.java b/src/main/java/org/apache/doris/kafka/connector/converter/RecordTypeRegister.java index e78909d..5cf1001 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/RecordTypeRegister.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/RecordTypeRegister.java @@ -36,11 +36,15 @@ import org.apache.doris.kafka.connector.converter.type.connect.ConnectMapToConne import org.apache.doris.kafka.connector.converter.type.connect.ConnectStringType; import org.apache.doris.kafka.connector.converter.type.connect.ConnectTimeType; import org.apache.doris.kafka.connector.converter.type.connect.ConnectTimestampType; +import org.apache.doris.kafka.connector.converter.type.debezium.ArrayType; import org.apache.doris.kafka.connector.converter.type.debezium.DateType; +import org.apache.doris.kafka.connector.converter.type.debezium.GeographyType; +import org.apache.doris.kafka.connector.converter.type.debezium.GeometryType; import org.apache.doris.kafka.connector.converter.type.debezium.MicroTimeType; import org.apache.doris.kafka.connector.converter.type.debezium.MicroTimestampType; import org.apache.doris.kafka.connector.converter.type.debezium.NanoTimeType; import org.apache.doris.kafka.connector.converter.type.debezium.NanoTimestampType; +import org.apache.doris.kafka.connector.converter.type.debezium.PointType; import org.apache.doris.kafka.connector.converter.type.debezium.TimeType; import org.apache.doris.kafka.connector.converter.type.debezium.TimestampType; import org.apache.doris.kafka.connector.converter.type.debezium.VariableScaleDecimalType; @@ -73,6 +77,10 @@ public class RecordTypeRegister { registerType(ZonedTimeType.INSTANCE); registerType(ZonedTimestampType.INSTANCE); registerType(VariableScaleDecimalType.INSTANCE); +registerType(PointType.INSTANCE); +registerType(GeographyType.INSTANCE); +registerType(GeometryType.INSTANCE); +registerType(ArrayType.INSTANCE); // Sup
(doris-kafka-connector) branch master updated: [feature]Support stream load configuration TwoPhaseCommit (#27)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git The following commit(s) were added to refs/heads/master by this push: new 73a4f9c [feature]Support stream load configuration TwoPhaseCommit (#27) 73a4f9c is described below commit 73a4f9cd1fe82099364fde9081bbc754a1e70ba0 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Thu Jun 6 16:00:12 2024 +0800 [feature]Support stream load configuration TwoPhaseCommit (#27) --- .../java/org/apache/doris/kafka/connector/cfg/DorisOptions.java | 9 + .../doris/kafka/connector/cfg/DorisSinkConnectorConfig.java | 2 ++ .../org/apache/doris/kafka/connector/utils/HttpPutBuilder.java | 4 ++-- .../doris/kafka/connector/writer/commit/DorisCommitter.java | 2 +- .../doris/kafka/connector/writer/load/DorisStreamLoad.java | 2 +- 5 files changed, 15 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java index 5747925..3db78ca 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java @@ -51,6 +51,7 @@ public class DorisOptions { private boolean enableCustomJMX; private final int taskId; private final boolean enableDelete; +private boolean enable2PC; private boolean autoRedirect = true; private int requestReadTimeoutMs; private int requestConnectTimeoutMs; @@ -112,6 +113,10 @@ public class DorisOptions { } this.topicMap = getTopicToTableMap(config); +this.enable2PC = DorisSinkConnectorConfig.ENABLE_2PC_DEFAULT; +if (config.containsKey(DorisSinkConnectorConfig.ENABLE_2PC)) { +this.enable2PC = Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC)); +} enableCustomJMX = DorisSinkConnectorConfig.JMX_OPT_DEFAULT; if (config.containsKey(DorisSinkConnectorConfig.JMX_OPT)) { enableCustomJMX = Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.JMX_OPT)); @@ -202,6 +207,10 @@ public class DorisOptions { return topicMap.get(topic); } +public boolean enable2PC() { +return enable2PC; +} + public Map getTopicMap() { return topicMap; } diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java index f4cabb3..02c24d0 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java @@ -92,6 +92,8 @@ public class DorisSinkConnectorConfig { public static final String ENABLE_DELETE = "enable.delete"; public static final boolean ENABLE_DELETE_DEFAULT = false; +public static final String ENABLE_2PC = "enable.2pc"; +public static final boolean ENABLE_2PC_DEFAULT = true; private static final ConfigDef.Validator nonEmptyStringValidator = new ConfigDef.NonEmptyString(); diff --git a/src/main/java/org/apache/doris/kafka/connector/utils/HttpPutBuilder.java b/src/main/java/org/apache/doris/kafka/connector/utils/HttpPutBuilder.java index a51961c..36161ac 100644 --- a/src/main/java/org/apache/doris/kafka/connector/utils/HttpPutBuilder.java +++ b/src/main/java/org/apache/doris/kafka/connector/utils/HttpPutBuilder.java @@ -110,8 +110,8 @@ public class HttpPutBuilder { return this; } -public HttpPutBuilder enable2PC() { -header.put("two_phase_commit", "true"); +public HttpPutBuilder enable2PC(boolean enable2PC) { +header.put("two_phase_commit", String.valueOf(enable2PC)); return this; } diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java b/src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java index d25ff8a..46d9a65 100644 --- a/src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java +++ b/src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java @@ -57,7 +57,7 @@ public class DorisCommitter { } public void commit(List dorisCommittables) { -if (dorisCommittables.isEmpty()) { +if (!dorisOptions.enable2PC() || dorisCommittables.isEmpty()) { return; } for (DorisCommittable dorisCommittable : dorisCommittables) { diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java b/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java index cdbc2a6..b26a735 100644 --- a/sr
(doris-flink-connector) branch master updated: [improve](test-case) modify ut for batch streamload (#396)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 386613d6 [improve](test-case) modify ut for batch streamload (#396) 386613d6 is described below commit 386613d66617436d977450922e3b1548df8aaf76 Author: wudi <676366...@qq.com> AuthorDate: Thu Jun 6 10:04:09 2024 +0800 [improve](test-case) modify ut for batch streamload (#396) --- .../org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java index 3ccef825..d73ff440 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java @@ -23,7 +23,6 @@ import org.apache.flink.util.function.SupplierWithException; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; -import org.apache.doris.flink.exception.DorisBatchLoadException; import org.apache.doris.flink.sink.BackendUtil; import org.apache.doris.flink.sink.HttpTestUtil; import org.apache.doris.flink.sink.writer.LabelGenerator; @@ -119,7 +118,7 @@ public class TestDorisBatchStreamLoad { loader.flush("db.tbl", true); AtomicReference exception = loader.getException(); -Assert.assertEquals(exception.get().getClass(), DorisBatchLoadException.class); +Assert.assertTrue(exception.get() instanceof Exception); Assert.assertTrue(exception.get().getMessage().contains("stream load error")); } @@ -159,7 +158,7 @@ public class TestDorisBatchStreamLoad { AtomicReference exception = loader.getException(); -Assert.assertEquals(exception.get().getClass(), DorisBatchLoadException.class); +Assert.assertTrue(exception.get() instanceof Exception); Assert.assertTrue(exception.get().getMessage().contains("stream load error")); } - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
svn commit: r69544 - /dev/doris/doris-shade/1.0.5/ /release/doris/doris-shade/1.0.5/
Author: diwu Date: Wed Jun 5 13:05:21 2024 New Revision: 69544 Log: move doris shade 1.0.5 to release Added: release/doris/doris-shade/1.0.5/ - copied from r69543, dev/doris/doris-shade/1.0.5/ Removed: dev/doris/doris-shade/1.0.5/ - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-spark-connector) branch master updated: throw exception when commit transaction failed (#206)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new 7fddf0a throw exception when commit transaction failed (#206) 7fddf0a is described below commit 7fddf0aea9ef70caa42e8b4b993395a6383616f5 Author: zhaorongsheng AuthorDate: Wed Jun 5 15:44:17 2024 +0800 throw exception when commit transaction failed (#206) --- .../main/java/org/apache/doris/spark/load/DorisStreamLoad.java| 7 --- .../src/main/scala/org/apache/doris/spark/load/StreamLoader.scala | 8 +--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index 6b1708d..7f97516 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -328,10 +328,11 @@ public class DorisStreamLoad implements Serializable { String loadResult = EntityUtils.toString(response.getEntity()); Map res = MAPPER.readValue(loadResult, new TypeReference>() { }); -if (res.get("status").equals("Fail") && !ResponseUtil.isCommitted(res.get("msg"))) { -throw new StreamLoadException("Commit failed " + loadResult); +if (res.get("status").equals("Success") || ResponseUtil.isCommitted(res.get("msg"))) { +LOG.info("commit transaction {} succeed, load result: {}.", txnId, loadResult); } else { -LOG.info("load result {}", loadResult); +LOG.error("commit transaction {} failed. load result: {}", txnId, loadResult); +throw new StreamLoadException("Commit failed " + loadResult); } } diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala index 5986c08..9481b6f 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala @@ -151,10 +151,12 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader if (response.getEntity != null) { val loadResult = EntityUtils.toString(response.getEntity) val res = MAPPER.readValue(loadResult, new TypeReference[util.HashMap[String, String]]() {}) -if (res.get("status") == "Fail" && !ResponseUtil.isCommitted(res.get("msg"))) throw new StreamLoadException("Commit failed " + loadResult) -else LOG.info("load result {}", loadResult) +if (res.get("status") == "Success" || ResponseUtil.isCommitted(res.get("msg"))) LOG.info("commit transaction {} succeed, load result: {}.", msg.value, loadResult) +else { + LOG.error("commit transaction {} failed. load result: {}", msg.value, loadResult) + throw new StreamLoadException("Commit failed " + loadResult) +} } - } match { case Success(_) => client.close() case Failure(e) => - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-kafka-connector) branch master updated: [Improve]Optimize to determine whether table in doris exists (#26)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git The following commit(s) were added to refs/heads/master by this push: new dc02d7d [Improve]Optimize to determine whether table in doris exists (#26) dc02d7d is described below commit dc02d7d0b015bc76823965cddeee69d0406d3671 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Wed Jun 5 14:22:15 2024 +0800 [Improve]Optimize to determine whether table in doris exists (#26) --- .../connector/service/DorisSystemService.java | 28 -- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java b/src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java index b627daf..3622400 100644 --- a/src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java +++ b/src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java @@ -22,8 +22,10 @@ package org.apache.doris.kafka.connector.service; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.function.Predicate; import org.apache.commons.compress.utils.Lists; import org.apache.doris.kafka.connector.cfg.DorisOptions; @@ -47,29 +49,31 @@ public class DorisSystemService { Collections.singletonList("information_schema"); public boolean tableExists(String database, String table) { -return databaseExists(database) && listTables(database).contains(table); +return listTables(database).contains(table); } public boolean databaseExists(String database) { return listDatabases().contains(database); } -public List listDatabases() { -return extractColumnValuesBySQL( -"SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;", -1, -dbName -> !builtinDatabases.contains(dbName)); +public Set listDatabases() { +return new HashSet<>( +extractColumnValuesBySQL( +"SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;", +1, +dbName -> !builtinDatabases.contains(dbName))); } -public List listTables(String databaseName) { +public Set listTables(String databaseName) { if (!databaseExists(databaseName)) { throw new DorisException("database" + databaseName + " is not exists"); } -return extractColumnValuesBySQL( -"SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?", -1, -null, -databaseName); +return new HashSet<>( +extractColumnValuesBySQL( +"SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?", +1, +null, +databaseName)); } public boolean isColumnExist(String database, String tableName, String columnName) { - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch master updated: [improve](testcase) increase mongo cdc UT (#393)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 03faf5b4 [improve](testcase) increase mongo cdc UT (#393) 03faf5b4 is described below commit 03faf5b4f52aa1ffad168ef16b23a5cd803e432f Author: bingquanzhao AuthorDate: Tue Jun 4 20:40:24 2024 +0800 [improve](testcase) increase mongo cdc UT (#393) --- .../flink/tools/cdc/mongodb/MongoDBSchema.java | 4 +- .../flink/tools/cdc/mongodb/MongoDBSchemaTest.java | 47 +++ .../flink/tools/cdc/mongodb/MongoDBTypeTest.java | 139 + .../tools/cdc/mongodb/MongoDateConverterTest.java | 32 + .../mongodb/MongoParsingProcessFunctionTest.java | 35 ++ 5 files changed, 256 insertions(+), 1 deletion(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java index 2c2e1b48..41752c5e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java @@ -17,6 +17,7 @@ package org.apache.doris.flink.tools.cdc.mongodb; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.doris.flink.catalog.doris.DorisType; @@ -63,7 +64,8 @@ public class MongoDBSchema extends SourceSchema { return existingField != null && existingField.getTypeString().startsWith(DorisType.DECIMAL); } -private String replaceDecimalTypeIfNeeded(String fieldName, String newDorisType) { +@VisibleForTesting +protected String replaceDecimalTypeIfNeeded(String fieldName, String newDorisType) { FieldSchema existingField = fields.get(fieldName); if (existingField.getTypeString().startsWith(DorisType.DECIMAL)) { Tuple2 existingPrecisionAndScale = diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java new file mode 100644 index ..57f7f470 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc.mongodb; + +import org.bson.Document; +import org.junit.Test; + +import java.util.ArrayList; + +import static org.junit.Assert.assertEquals; + +public class MongoDBSchemaTest { + +@Test +public void convertToDorisType() {} + +@Test +public void getCdcTableName() throws Exception { +MongoDBSchema mongoDBSchema = +new MongoDBSchema(new ArrayList(), "db_TEST", "test_table", ""); +assertEquals("db_TEST\\.test_table", mongoDBSchema.getCdcTableName()); +} + +@Test +public void replaceDecimalTypeIfNeeded() throws Exception { +ArrayList documents = new ArrayList<>(); +documents.add(new Document("fields1", 1234567.66)); +MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", "test_table", ""); +String d = mongoDBSchema.replaceDecimalTypeIfNeeded("fields1", "DECIMALV3(12,8)"); +assertEquals("DECIMAL(15,8)", d); +} +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBTypeTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBTypeTest.java new file mode 100644 index ..ee511ce2 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBTypeTest.java @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributo
(doris-kafka-connector) branch master updated: [feature]Support reading protobuf serialized data (#23)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git The following commit(s) were added to refs/heads/master by this push: new 086bf90 [feature]Support reading protobuf serialized data (#23) 086bf90 is described below commit 086bf9016605a11d8f518464836d72d13be94a24 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Tue Jun 4 11:30:13 2024 +0800 [feature]Support reading protobuf serialized data (#23) --- pom.xml | 11 +++ 1 file changed, 11 insertions(+) diff --git a/pom.xml b/pom.xml index 282650f..ddbeb2f 100644 --- a/pom.xml +++ b/pom.xml @@ -265,6 +265,17 @@ kafka-connect-avro-converter ${confluent.version} + +io.confluent +kafka-protobuf-serializer +${confluent.version} + + +io.confluent +kafka-connect-protobuf-converter +${confluent.version} + + io.debezium - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-kafka-connector) branch master updated: [improve] Add a check for column existence when adding a new column. (#20)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git The following commit(s) were added to refs/heads/master by this push: new 248db01 [improve] Add a check for column existence when adding a new column. (#20) 248db01 is described below commit 248db0198f760417a3c7cf319518808618218aa1 Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Tue Jun 4 11:29:50 2024 +0800 [improve] Add a check for column existence when adding a new column. (#20) --- .../connector/converter/schema/SchemaChangeManager.java | 17 - .../kafka/connector/service/DorisSystemService.java | 9 + 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaChangeManager.java b/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaChangeManager.java index 376edf9..3086adb 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaChangeManager.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaChangeManager.java @@ -19,6 +19,8 @@ package org.apache.doris.kafka.connector.converter.schema; +import static java.net.HttpURLConnection.HTTP_OK; + import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.io.Serializable; @@ -30,6 +32,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.doris.kafka.connector.cfg.DorisOptions; import org.apache.doris.kafka.connector.converter.RecordDescriptor; import org.apache.doris.kafka.connector.exception.SchemaChangeException; +import org.apache.doris.kafka.connector.service.DorisSystemService; import org.apache.http.HttpHeaders; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; @@ -48,9 +51,11 @@ public class SchemaChangeManager implements Serializable { private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s;; private final ObjectMapper objectMapper = new ObjectMapper(); private final DorisOptions dorisOptions; +private DorisSystemService dorisSystemService; public SchemaChangeManager(DorisOptions dorisOptions) { this.dorisOptions = dorisOptions; +this.dorisSystemService = new DorisSystemService(dorisOptions); } private boolean handleSchemaChange(Map responseMap, String responseEntity) { @@ -64,6 +69,16 @@ public class SchemaChangeManager implements Serializable { public void addColumnDDL(String tableName, RecordDescriptor.FieldDescriptor field) { try { +// check the add column whether exist in table. +if (dorisSystemService.isColumnExist( +dorisOptions.getDatabase(), tableName, field.getName())) { +LOG.warn( +"The column {} already exists in table {}, no need to add it again", +field.getName(), +tableName); +return; +} + String addColumnDDL = buildAddColumnDDL(dorisOptions.getDatabase(), tableName, field); boolean status = execute(addColumnDDL, dorisOptions.getDatabase()); LOG.info( @@ -146,7 +161,7 @@ public class SchemaChangeManager implements Serializable { CloseableHttpResponse response = httpclient.execute(request); final int statusCode = response.getStatusLine().getStatusCode(); final String reasonPhrase = response.getStatusLine().getReasonPhrase(); -if (statusCode == 200 && response.getEntity() != null) { +if (statusCode == HTTP_OK && response.getEntity() != null) { responseEntity = EntityUtils.toString(response.getEntity()); return objectMapper.readValue(responseEntity, Map.class); } else { diff --git a/src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java b/src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java index 2365aa4..b627daf 100644 --- a/src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java +++ b/src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java @@ -35,6 +35,8 @@ import org.slf4j.LoggerFactory; public class DorisSystemService { private static final Logger LOG = LoggerFactory.getLogger(DorisSystemService.class); +private static final String GET_COLUMN_EXISTS_TEMPLATE = +"SELECT COLUMN_NAME FROM information_schema.`COLUMNS` WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND COLUMN_NAME = ?"; private final JdbcConnectionProvider jdbcConnectionProvider; public DorisSystemService(DorisOptions dorisOptions) { @@ -70,6 +72,13 @@ public class Dor
(doris-kafka-connector) branch master updated: [Improve]schema.evolution adds debezium prefix (#24)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git The following commit(s) were added to refs/heads/master by this push: new fc4c8c8 [Improve]schema.evolution adds debezium prefix (#24) fc4c8c8 is described below commit fc4c8c8b392ad979621a58124546018520930aa6 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Mon Jun 3 18:05:18 2024 +0800 [Improve]schema.evolution adds debezium prefix (#24) --- src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java | 4 ++-- .../apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java | 5 +++-- .../apache/doris/kafka/connector/converter/TestRecordService.java| 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java index 4596f69..5747925 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java @@ -95,8 +95,8 @@ public class DorisOptions { this.schemaEvolutionMode = SchemaEvolutionMode.of( config.getOrDefault( -DorisSinkConnectorConfig.SCHEMA_EVOLUTION, - DorisSinkConnectorConfig.SCHEMA_EVOLUTION_DEFAULT)); + DorisSinkConnectorConfig.DEBEZIUM_SCHEMA_EVOLUTION, + DorisSinkConnectorConfig.DEBEZIUM_SCHEMA_EVOLUTION_DEFAULT)); this.fileSize = Integer.parseInt(config.get(DorisSinkConnectorConfig.BUFFER_SIZE_BYTES)); this.recordNum = diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java index 5c33da4..f4cabb3 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java @@ -82,8 +82,9 @@ public class DorisSinkConnectorConfig { // Prefix for Doris StreamLoad specific properties. public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties."; -public static final String SCHEMA_EVOLUTION = "schema.evolution"; -public static final String SCHEMA_EVOLUTION_DEFAULT = SchemaEvolutionMode.NONE.getName(); +public static final String DEBEZIUM_SCHEMA_EVOLUTION = "debezium.schema.evolution"; +public static final String DEBEZIUM_SCHEMA_EVOLUTION_DEFAULT = +SchemaEvolutionMode.NONE.getName(); // metrics public static final String JMX_OPT = "jmx"; diff --git a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java index f80a737..0142259 100644 --- a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java +++ b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java @@ -72,7 +72,7 @@ public class TestRecordService { props.load(stream); props.put("task_id", "1"); props.put("converter.mode", "debezium_ingestion"); -props.put("schema.evolution", "basic"); +props.put("debezium.schema.evolution", "basic"); props.put( "doris.topic2table.map", "avro_schema.wdl_test.example_table:example_table,normal.wdl_test.test_sink_normal:test_sink_normal"); - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-kafka-connector) branch master updated: [fix]Fix DateTimeUtils, SchemaUtils referenced packages from debezium-connector-jdbc (#22)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git The following commit(s) were added to refs/heads/master by this push: new 5139d51 [fix]Fix DateTimeUtils, SchemaUtils referenced packages from debezium-connector-jdbc (#22) 5139d51 is described below commit 5139d51f904d5c4ae1b23977da2e9a20cfb9ee33 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Wed May 29 17:54:19 2024 +0800 [fix]Fix DateTimeUtils, SchemaUtils referenced packages from debezium-connector-jdbc (#22) --- .../org/apache/doris/kafka/connector/converter/RecordService.java | 4 ++-- .../org/apache/doris/kafka/connector/converter/type/AbstractType.java | 2 +- .../doris/kafka/connector/converter/type/connect/ConnectDateType.java | 2 +- .../doris/kafka/connector/converter/type/connect/ConnectTimeType.java | 2 +- .../kafka/connector/converter/type/connect/ConnectTimestampType.java | 2 +- .../doris/kafka/connector/converter/type/debezium/DateType.java | 2 +- .../doris/kafka/connector/converter/type/debezium/MicroTimeType.java | 2 +- .../kafka/connector/converter/type/debezium/MicroTimestampType.java | 2 +- .../doris/kafka/connector/converter/type/debezium/NanoTimeType.java | 2 +- .../kafka/connector/converter/type/debezium/NanoTimestampType.java| 2 +- .../doris/kafka/connector/converter/type/debezium/TimeType.java | 2 +- .../doris/kafka/connector/converter/type}/util/DateTimeUtils.java | 3 ++- .../doris/kafka/connector/converter/type}/util/SchemaUtils.java | 3 ++- 13 files changed, 16 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java index 9487307..cd76c42 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java @@ -126,9 +126,9 @@ public class RecordService { private void validate(SinkRecord record) { if (isSchemaChange(record)) { LOG.warn( -"Schema change records are not supported by JDBC connector. Adjust `topics` or `topics.regex` to exclude schema change topic."); +"Schema change records are not supported by doris-kafka-connector. Adjust `topics` or `topics.regex` to exclude schema change topic."); throw new DorisException( -"Schema change records are not supported by JDBC connector. Adjust `topics` or `topics.regex` to exclude schema change topic."); +"Schema change records are not supported by doris-kafka-connector. Adjust `topics` or `topics.regex` to exclude schema change topic."); } } diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractType.java index 650e792..8bc6b54 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractType.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractType.java @@ -18,10 +18,10 @@ */ package org.apache.doris.kafka.connector.converter.type; -import io.debezium.connector.jdbc.util.SchemaUtils; import java.util.Objects; import java.util.Optional; import org.apache.doris.kafka.connector.cfg.DorisOptions; +import org.apache.doris.kafka.connector.converter.type.util.SchemaUtils; import org.apache.kafka.connect.data.Schema; /** An abstract implementation of {@link Type}, which all types should extend. */ diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDateType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDateType.java index acac4af..b4dbdbe 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDateType.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDateType.java @@ -18,8 +18,8 @@ */ package org.apache.doris.kafka.connector.converter.type.connect; -import io.debezium.connector.jdbc.util.DateTimeUtils; import org.apache.doris.kafka.connector.converter.type.AbstractDateType; +import org.apache.doris.kafka.connector.converter.type.util.DateTimeUtils; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.errors.ConnectException; diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimeType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimeType.java index c2e1698..b7a7834 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimeType.java +++ b/src/main/java/org/apach
(doris-flink-connector) branch master updated: [Improve] remove jupiter5 dependency for testcase (#391)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 5236a6d [Improve] remove jupiter5 dependency for testcase (#391) 5236a6d is described below commit 5236a6d51959295626a7021f320d5926eadcb965 Author: wudi <676366...@qq.com> AuthorDate: Tue May 28 20:36:53 2024 +0800 [Improve] remove jupiter5 dependency for testcase (#391) --- flink-doris-connector/pom.xml | 7 -- .../java/org/apache/doris/flink/DorisTestBase.java | 25 ++ .../apache/doris/flink/sink/DorisSinkITCase.java | 74 +++--- .../doris/flink/source/DorisSourceITCase.java | 15 ++-- .../doris/flink/tools/cdc/DorisDorisE2ECase.java | 5 +- .../doris/flink/tools/cdc/MySQLDorisE2ECase.java | 90 +- 6 files changed, 63 insertions(+), 153 deletions(-) diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index 23145e4..db05e2e 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -90,7 +90,6 @@ under the License. 1.7.25 4.2.0 1.17.6 -5.10.1 4.11 1.3 @@ -330,12 +329,6 @@ under the License. ${junit.version} test - -org.junit.jupiter -junit-jupiter-api -${junit-jupiter.version} -test - org.testcontainers testcontainers diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java index e0f75f0..8141eaf 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java @@ -18,6 +18,8 @@ package org.apache.doris.flink; import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; +import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.GenericContainer; @@ -177,4 +179,27 @@ public abstract class DorisTestBase { } return list; } + +public void checkResult(List expected, String query, int columnSize) throws Exception { +List actual = new ArrayList<>(); +try (Connection connection = +DriverManager.getConnection( +String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD); +Statement statement = connection.createStatement()) { +ResultSet sinkResultSet = statement.executeQuery(query); +while (sinkResultSet.next()) { +List row = new ArrayList<>(); +for (int i = 1; i <= columnSize; i++) { +Object value = sinkResultSet.getObject(i); +if (value == null) { +row.add("null"); +} else { +row.add(value.toString()); +} +} +actual.add(StringUtils.join(row, ",")); +} +} +Assert.assertArrayEquals(expected.toArray(), actual.toArray()); +} } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java index c9501d3..ad6923c 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java @@ -28,22 +28,16 @@ import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer; import org.junit.Test; -import org.junit.jupiter.api.Assertions; import java.sql.Connection; import java.sql.DriverManager; -import java.sql.ResultSet; import java.sql.Statement; import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import java.util.UUID; -import java.util.stream.Collectors; -import java.util.stream.Stream; /** DorisSink ITCase with csv and arrow format. */ public class DorisSinkITCase extends DorisTestBase { @@ -62,25 +56,9 @@ public class DorisSinkITCase extends DorisTestBase { submitJob(TABLE_CSV, properties, new String[] {"doris,1"}); Thread.sleep(1); -Set> actual = new HashSet<>(); - -try (Connection connection = -DriverManager.getConnection( -
(doris-kafka-connector) branch master updated: [fix] fix Enum data type mapping null pointer (#21)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git The following commit(s) were added to refs/heads/master by this push: new 1a8d34a [fix] fix Enum data type mapping null pointer (#21) 1a8d34a is described below commit 1a8d34a76702387365580a04ae970c841ba20518 Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Tue May 28 14:14:08 2024 +0800 [fix] fix Enum data type mapping null pointer (#21) --- .../apache/doris/kafka/connector/converter/RecordDescriptor.java | 9 ++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java b/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java index c4f7243..47c34b2 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java @@ -134,9 +134,12 @@ public class RecordDescriptor { this.schemaName = schema.name(); this.schemaTypeName = schema.type().name(); this.type = -Objects.nonNull(schema.name()) -? typeRegistry.get(schema.name()) -: typeRegistry.get(schema.type().name()); +typeRegistry.getOrDefault( +schema.name(), typeRegistry.get(schema.type().name())); +if (this.type == null) { +throw new IllegalArgumentException( +"Type not found in registry for schema: " + schema); +} this.typeName = type.getTypeName(schema); } - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch master updated: [improve](cdc) support synchronization of PostgreSQL partitioned tables (#389)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 4ed08f4 [improve](cdc) support synchronization of PostgreSQL partitioned tables (#389) 4ed08f4 is described below commit 4ed08f4f00fcdc9465aa0ca8516866ebc67045b5 Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Thu May 23 13:59:23 2024 +0800 [improve](cdc) support synchronization of PostgreSQL partitioned tables (#389) --- .../doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java index 490fdbc..c9387c1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java @@ -104,7 +104,11 @@ public class PostgresDatabaseSync extends DatabaseSync { try (Connection conn = getConnection()) { DatabaseMetaData metaData = conn.getMetaData(); try (ResultSet tables = -metaData.getTables(databaseName, schemaName, "%", new String[] {"TABLE"})) { +metaData.getTables( +databaseName, +schemaName, +"%", +new String[] {"TABLE", "PARTITIONED TABLE"})) { while (tables.next()) { String tableName = tables.getString("TABLE_NAME"); String tableComment = tables.getString("REMARKS"); @@ -115,7 +119,7 @@ public class PostgresDatabaseSync extends DatabaseSync { new PostgresSchema( metaData, databaseName, schemaName, tableName, tableComment); sourceSchema.setModel( -sourceSchema.primaryKeys.size() > 0 +!sourceSchema.primaryKeys.isEmpty() ? DataModel.UNIQUE : DataModel.DUPLICATE); schemaList.add(sourceSchema); - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch master updated: [optimize](sink) Optimize the BE load balancing logic during concurrent imports. (#388)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new d9f52e6 [optimize](sink) Optimize the BE load balancing logic during concurrent imports. (#388) d9f52e6 is described below commit d9f52e6774a2aafa1c1274d2048afdae86f0e39e Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Tue May 21 17:35:22 2024 +0800 [optimize](sink) Optimize the BE load balancing logic during concurrent imports. (#388) --- .../src/main/java/org/apache/doris/flink/sink/BackendUtil.java| 8 +++- .../main/java/org/apache/doris/flink/sink/writer/DorisWriter.java | 6 +++--- .../test/java/org/apache/doris/flink/sink/TestBackendUtil.java| 4 +--- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java index cb5b6f2..9296ae5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java @@ -76,9 +76,15 @@ public class BackendUtil { } public String getAvailableBackend() { +return getAvailableBackend(0); +} + +public String getAvailableBackend(int subtaskId) { long tmp = pos + backends.size(); while (pos < tmp) { -BackendV2.BackendRowV2 backend = backends.get((int) (pos++ % backends.size())); +BackendV2.BackendRowV2 backend = +backends.get((int) ((pos + subtaskId) % backends.size())); +pos++; String res = backend.toBackendString(); if (tryHttpConnection(res)) { return res; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index 08edfc7..81bfe88 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -312,7 +312,7 @@ public class DorisWriter List writerStates = new ArrayList<>(); for (DorisStreamLoad dorisStreamLoad : dorisStreamLoadMap.values()) { // Dynamic refresh backend -dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend()); + dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend(subtaskId)); DorisWriterState writerState = new DorisWriterState( labelPrefix, @@ -340,7 +340,7 @@ public class DorisWriter tableKey, v -> new DorisStreamLoad( -backendUtil.getAvailableBackend(), +backendUtil.getAvailableBackend(subtaskId), dorisOptions, executionOptions, labelGenerator, @@ -373,7 +373,7 @@ public class DorisWriter // use send cached data to new txn, then notify to restart the stream if (executionOptions.isUseCache()) { try { - dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend()); + dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend(subtaskId)); if (executionOptions.enabled2PC()) { dorisStreamLoad.abortPreCommit(labelPrefix, curCheckpointId); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java index 8a780ff..ece59c7 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java @@ -22,7 +22,6 @@ import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -45,8 +44,7 @@ public class TestBackendUtil { @Test public void testTryHttpConnection() { -BackendUtil backendUtil = new BackendUtil(new ArrayList<>()); -boolean flag = backendUtil.tryHttpConnection("127.0.0.1:8040"); +boolean flag = BackendUtil.tryHttpConnection("127.0.0.1:8040"); Assert.assertFalse(flag); } - To unsubscribe, e-mail: commits-unsubscr...@dori
svn commit: r69293 - /dev/doris/spark-connector/1.3.2/ /release/doris/spark-connector/1.3.2/
Author: diwu Date: Mon May 20 13:07:21 2024 New Revision: 69293 Log: move doris spark connector 1.3.2 to release Added: release/doris/spark-connector/1.3.2/ - copied from r69292, dev/doris/spark-connector/1.3.2/ Removed: dev/doris/spark-connector/1.3.2/ - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
svn commit: r69292 - /dev/doris/flink-connector/1.6.1/ /release/doris/flink-connector/1.6.1/
Author: diwu Date: Mon May 20 13:07:13 2024 New Revision: 69292 Log: move doris flink connector 1.6.1 to release Added: release/doris/flink-connector/1.6.1/ - copied from r69291, dev/doris/flink-connector/1.6.1/ Removed: dev/doris/flink-connector/1.6.1/ - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-kafka-connector) branch master updated: [Improve]Improve the length of generated stream load label (#18)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git The following commit(s) were added to refs/heads/master by this push: new 083b343 [Improve]Improve the length of generated stream load label (#18) 083b343 is described below commit 083b343cbf880a90b60dffab3ff7397b34f7ae22 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Fri May 17 16:36:56 2024 +0800 [Improve]Improve the length of generated stream load label (#18) --- .../doris/kafka/connector/utils/FileNameUtils.java | 2 +- .../kafka/connector/writer/LabelGenerator.java | 39 +++--- .../kafka/connector/writer/StreamLoadWriter.java | 11 +- .../connector/writer/TestStreamLoadWriter.java | 8 ++--- 4 files changed, 18 insertions(+), 42 deletions(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/utils/FileNameUtils.java b/src/main/java/org/apache/doris/kafka/connector/utils/FileNameUtils.java index 052f03c..8c00f43 100644 --- a/src/main/java/org/apache/doris/kafka/connector/utils/FileNameUtils.java +++ b/src/main/java/org/apache/doris/kafka/connector/utils/FileNameUtils.java @@ -89,7 +89,7 @@ public class FileNameUtils { } public static long labelToEndOffset(String label) { -return Long.parseLong(readFromFileName(label, 5)); +return Long.parseLong(readFromFileName(label, 3)); } /** diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java b/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java index 349db71..daca8b1 100644 --- a/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java +++ b/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java @@ -19,55 +19,40 @@ package org.apache.doris.kafka.connector.writer; -import java.util.UUID; +import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; /** Generator label for stream load. */ public class LabelGenerator { -private final String labelPrefix; private String topic; private int partition; -private final boolean enable2PC; private String tableIdentifier; -private int subtaskId; +// The label of doris stream load cannot be repeated when loading. +// Under special circumstances (usually load failure) when doris-kafka-connector is started, +// stream load is performed at the same offset every time, which will cause label duplication. +// For this reason, we use labelRandomSuffix to generate a random suffix at startup. +private final AtomicLong labelRandomSuffix; -public LabelGenerator(String labelPrefix, boolean enable2PC) { -this.labelPrefix = labelPrefix; -this.enable2PC = enable2PC; -} - -public LabelGenerator( -String labelPrefix, -boolean enable2PC, -String topic, -int partition, -String tableIdentifier, -int subtaskId) { -this(labelPrefix, enable2PC); +public LabelGenerator(String topic, int partition, String tableIdentifier) { // The label of stream load can not contain `.` this.tableIdentifier = tableIdentifier.replaceAll("\\.", "_"); this.topic = topic.replaceAll("\\.", "_"); -this.subtaskId = subtaskId; this.partition = partition; +Random random = new Random(); +labelRandomSuffix = new AtomicLong(random.nextInt(1000)); } public String generateLabel(long lastOffset) { StringBuilder sb = new StringBuilder(); -sb.append(labelPrefix) -.append(LoadConstants.FILE_DELIM_DEFAULT) -.append(topic) +sb.append(topic) .append(LoadConstants.FILE_DELIM_DEFAULT) .append(partition) .append(LoadConstants.FILE_DELIM_DEFAULT) .append(tableIdentifier) .append(LoadConstants.FILE_DELIM_DEFAULT) -.append(subtaskId) -.append(LoadConstants.FILE_DELIM_DEFAULT) .append(lastOffset) .append(LoadConstants.FILE_DELIM_DEFAULT) -.append(System.currentTimeMillis()); -if (!enable2PC) { - sb.append(LoadConstants.FILE_DELIM_DEFAULT).append(UUID.randomUUID()); -} +.append(labelRandomSuffix.getAndIncrement()); return sb.toString(); } } diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java index a8fc00a..54f102a 100644 --- a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java +++ b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java @@ -62,14
(doris-kafka-connector) branch master updated: [Improve]add cache for schema change fetch doris table (#19)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git The following commit(s) were added to refs/heads/master by this push: new 67aa079 [Improve]add cache for schema change fetch doris table (#19) 67aa079 is described below commit 67aa07991352307b136958331997468ab4fcf6dc Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Fri May 17 16:36:42 2024 +0800 [Improve]add cache for schema change fetch doris table (#19) --- .../kafka/connector/converter/RecordService.java | 29 +++--- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java index 8bc2480..9487307 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java @@ -66,6 +66,7 @@ public class RecordService { private DorisOptions dorisOptions; private RecordTypeRegister recordTypeRegister; private Set missingFields; +private Map dorisTableDescriptorCache; public RecordService() { this.converter = new JsonConverter(); @@ -80,6 +81,7 @@ public class RecordService { this.recordTypeRegister = new RecordTypeRegister(dorisOptions); this.dorisSystemService = new DorisSystemService(dorisOptions); this.schemaChangeManager = new SchemaChangeManager(dorisOptions); +this.dorisTableDescriptorCache = new HashMap<>(); } /** @@ -140,6 +142,9 @@ public class RecordService { String tableName, RecordDescriptor recordDescriptor) { if (!hasTable(tableName)) { // TODO Table does not exist, lets attempt to create it. +LOG.warn("The {} table does not exist, please create it manually.", tableName); +throw new DorisException( +"The " + tableName + " table does not exist, please create it manually."); } else { // Table exists, lets attempt to alter it if necessary. alterTableIfNeeded(tableName, recordDescriptor); @@ -147,12 +152,19 @@ public class RecordService { } private boolean hasTable(String tableName) { -return dorisSystemService.tableExists(dorisOptions.getDatabase(), tableName); +if (!dorisTableDescriptorCache.containsKey(tableName)) { +boolean exist = dorisSystemService.tableExists(dorisOptions.getDatabase(), tableName); +if (exist) { +dorisTableDescriptorCache.put(tableName, null); +} +return exist; +} +return true; } private void alterTableIfNeeded(String tableName, RecordDescriptor record) { // Resolve table metadata from the database -final TableDescriptor table = obtainTableSchema(tableName); +final TableDescriptor table = fetchDorisTableDescriptor(tableName); missingFields = resolveMissingFields(record, table); if (missingFields.isEmpty()) { @@ -177,6 +189,8 @@ public class RecordService { for (RecordDescriptor.FieldDescriptor missingField : missingFields) { schemaChangeManager.addColumnDDL(tableName, missingField); } +TableDescriptor newTableDescriptor = obtainTableSchema(tableName); +dorisTableDescriptorCache.put(tableName, newTableDescriptor); } private Set resolveMissingFields( @@ -192,9 +206,16 @@ public class RecordService { return missingFields; } +private TableDescriptor fetchDorisTableDescriptor(String tableName) { +if (!dorisTableDescriptorCache.containsKey(tableName) +|| Objects.isNull(dorisTableDescriptorCache.get(tableName))) { +TableDescriptor tableDescriptor = obtainTableSchema(tableName); +dorisTableDescriptorCache.put(tableName, tableDescriptor); +} +return dorisTableDescriptorCache.get(tableName); +} + private TableDescriptor obtainTableSchema(String tableName) { -// TODO when the table structure is obtained from doris for first time, it should be -// obtained in the cache later. Schema schema = RestService.getSchema(dorisOptions, dorisOptions.getDatabase(), tableName, LOG); List columnDescriptors = new ArrayList<>(); - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-kafka-connector) branch master updated: [Improve]The change basis of table schema is changed to parse data column field (#17)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git The following commit(s) were added to refs/heads/master by this push: new 415f37a [Improve]The change basis of table schema is changed to parse data column field (#17) 415f37a is described below commit 415f37a7aa2f6bf27c969c8363499edd0399cfd0 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Thu May 16 14:05:32 2024 +0800 [Improve]The change basis of table schema is changed to parse data column field (#17) --- .../connector/jdbc/util}/DateTimeUtils.java| 6 +- .../debezium/connector/jdbc/util/SchemaUtils.java | 58 + .../doris/kafka/connector/cfg/DorisOptions.java| 17 +- .../connector/cfg/DorisSinkConnectorConfig.java| 5 +- .../connector/converter/RecordDescriptor.java | 67 ++--- .../kafka/connector/converter/RecordService.java | 148 ++- .../schema/SchemaChangeManager.java| 84 -- .../SchemaEvolutionMode.java} | 24 +- .../connector/converter/type/AbstractDateType.java | 11 +- .../connector/converter/type/AbstractTimeType.java | 28 +- .../converter/type/AbstractTimestampType.java | 23 +- .../connector/converter/type/AbstractType.java | 23 ++ .../doris/kafka/connector/converter/type/Type.java | 3 + .../converter/type/connect/ConnectBooleanType.java | 8 + .../converter/type/connect/ConnectBytesType.java | 7 + .../converter/type/connect/ConnectDateType.java| 2 +- .../converter/type/connect/ConnectDecimalType.java | 13 + .../converter/type/connect/ConnectFloat32Type.java | 8 + .../converter/type/connect/ConnectFloat64Type.java | 8 + .../converter/type/connect/ConnectInt16Type.java | 8 + .../converter/type/connect/ConnectInt32Type.java | 8 + .../converter/type/connect/ConnectInt64Type.java | 8 + .../converter/type/connect/ConnectInt8Type.java| 8 + .../connect/ConnectMapToConnectStringType.java | 6 + .../converter/type/connect/ConnectStringType.java | 19 ++ .../converter/type/connect/ConnectTimeType.java| 2 +- .../type/connect/ConnectTimestampType.java | 2 +- .../converter/type/debezium/DateType.java | 2 +- .../converter/type/debezium/MicroTimeType.java | 2 +- .../type/debezium/MicroTimestampType.java | 2 +- .../converter/type/debezium/NanoTimeType.java | 2 +- .../converter/type/debezium/NanoTimestampType.java | 2 +- .../converter/type/debezium/TimeType.java | 2 +- .../type/debezium/VariableScaleDecimalType.java| 12 + .../type/doris}/DorisType.java | 5 +- .../type/doris/DorisTypeProperties.java} | 8 +- .../kafka/connector/dialect/mysql/MysqlType.java | 213 --- .../connector/service/DorisDefaultSinkService.java | 26 +- .../kafka/connector/utils/ConfigCheckUtils.java| 29 --- .../writer/schema/DebeziumSchemaChange.java| 289 - .../writer/schema/SchemaChangeHelper.java | 159 .../connector/converter/TestRecordService.java | 99 ++- .../connector/writer/TestDebeziumSchemaChange.java | 133 -- .../kafka/connector/writer/TestRecordBuffer.java | 11 +- 44 files changed, 640 insertions(+), 960 deletions(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/utils/DateTimeUtils.java b/src/main/java/io/debezium/connector/jdbc/util/DateTimeUtils.java similarity index 95% rename from src/main/java/org/apache/doris/kafka/connector/converter/utils/DateTimeUtils.java rename to src/main/java/io/debezium/connector/jdbc/util/DateTimeUtils.java index 09ee9d0..941254d 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/utils/DateTimeUtils.java +++ b/src/main/java/io/debezium/connector/jdbc/util/DateTimeUtils.java @@ -15,8 +15,12 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. + * + * Copied from + * https://github.com/debezium/debezium-connector-jdbc/blob/main/src/main/java/io/debezium/connector/jdbc/util/DateTimeUtils.java */ -package org.apache.doris.kafka.connector.converter.utils; + +package io.debezium.connector.jdbc.util; import io.debezium.time.Conversions; import java.sql.Timestamp; diff --git a/src/main/java/io/debezium/connector/jdbc/util/SchemaUtils.java b/src/main/java/io/debezium/connector/jdbc/util/SchemaUtils.java new file mode 100644 index 000..178507c --- /dev/null +++ b/src/main/java/io/debezium/connector/jdbc/util/SchemaUtils.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright own
svn commit: r69168 - in /dev/doris/flink-connector/1.6.1: ./ apache-doris-flink-connector-1.6.1-src.tar.gz apache-doris-flink-connector-1.6.1-src.tar.gz.asc apache-doris-flink-connector-1.6.1-src.tar.
Author: diwu Date: Wed May 15 09:43:54 2024 New Revision: 69168 Log: flink connector 1.6.1 Added: dev/doris/flink-connector/1.6.1/ dev/doris/flink-connector/1.6.1/apache-doris-flink-connector-1.6.1-src.tar.gz (with props) dev/doris/flink-connector/1.6.1/apache-doris-flink-connector-1.6.1-src.tar.gz.asc (with props) dev/doris/flink-connector/1.6.1/apache-doris-flink-connector-1.6.1-src.tar.gz.sha512 Added: dev/doris/flink-connector/1.6.1/apache-doris-flink-connector-1.6.1-src.tar.gz == Binary file - no diff available. Propchange: dev/doris/flink-connector/1.6.1/apache-doris-flink-connector-1.6.1-src.tar.gz -- svn:mime-type = application/x-gzip Added: dev/doris/flink-connector/1.6.1/apache-doris-flink-connector-1.6.1-src.tar.gz.asc == Binary file - no diff available. Propchange: dev/doris/flink-connector/1.6.1/apache-doris-flink-connector-1.6.1-src.tar.gz.asc -- svn:mime-type = application/pgp-signature Added: dev/doris/flink-connector/1.6.1/apache-doris-flink-connector-1.6.1-src.tar.gz.sha512 == --- dev/doris/flink-connector/1.6.1/apache-doris-flink-connector-1.6.1-src.tar.gz.sha512 (added) +++ dev/doris/flink-connector/1.6.1/apache-doris-flink-connector-1.6.1-src.tar.gz.sha512 Wed May 15 09:43:54 2024 @@ -0,0 +1 @@ +0042ade26dc613af5b856a5fc37c223ed056546b94e92e120cfa774b1740030a9e3c6c5a33f0d8949c5f959092dea46a4fa9f93c35f9f35caae94101832dd4e4 apache-doris-flink-connector-1.6.1-src.tar.gz - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) 01/01: Commit for release 1.6.1
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to tag 1.6.1 in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git commit 1907b77137fb5d9940fba085862208a4df557b00 Author: wudi <676366...@qq.com> AuthorDate: Wed May 15 16:33:51 2024 +0800 Commit for release 1.6.1 --- flink-doris-connector/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index 23145e4..fe23ca8 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -67,7 +67,7 @@ under the License. -1.6.1-SNAPSHOT +1.6.1 1.18.0 1.18 2.4.2 - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) tag 1.6.1 created (now 1907b77)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to tag 1.6.1 in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git at 1907b77 (commit) This tag includes the following new commits: new 1907b77 Commit for release 1.6.1 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch release-1.6.1 created (now e49f7e4)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch release-1.6.1 in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git at e49f7e4 [fix][cdc] fix uid conflicts during multi-database synchronization. (#382) No new revisions were added by this update. - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org