This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 20465779f7b [fix](routine load) fix enclose and escape can not set in
routine load job (#38402)
20465779f7b is described below
commit 20465779f7b0301c65edd7409401d0c45a43959c
Author: hui lai <[email protected]>
AuthorDate: Tue Jul 30 14:45:24 2024 +0800
[fix](routine load) fix enclose and escape can not set in routine load job
(#38402)
---
.../doris/analysis/AlterRoutineLoadStmt.java | 8 +
.../doris/analysis/CreateRoutineLoadStmt.java | 32 ++--
.../doris/load/routineload/RoutineLoadJob.java | 10 +-
.../routine_load/test_routine_load_property.out | 10 ++
.../routine_load/data/test_enclose_and_escape0.csv | 1 +
.../routine_load/data/test_enclose_and_escape1.csv | 1 +
.../routine_load/test_routine_load_property.groovy | 187 +++++++++++++++++++++
7 files changed, 235 insertions(+), 14 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
index 67c103ed02c..d2a0844dfb4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
@@ -68,6 +68,8 @@ public class AlterRoutineLoadStmt extends DdlStmt {
.add(LoadStmt.STRICT_MODE)
.add(LoadStmt.TIMEZONE)
.add(CreateRoutineLoadStmt.WORKLOAD_GROUP)
+ .add(LoadStmt.KEY_ENCLOSE)
+ .add(LoadStmt.KEY_ESCAPE)
.build();
private final LabelName labelName;
@@ -250,6 +252,12 @@ public class AlterRoutineLoadStmt extends DdlStmt {
.getWorkloadGroup(ConnectContext.get().getCurrentUserIdentity(), workloadGroup);
analyzedJobProperties.put(CreateRoutineLoadStmt.WORKLOAD_GROUP,
String.valueOf(wgId));
}
+ if (jobProperties.containsKey(LoadStmt.KEY_ENCLOSE)) {
+ analyzedJobProperties.put(LoadStmt.KEY_ENCLOSE,
jobProperties.get(LoadStmt.KEY_ENCLOSE));
+ }
+ if (jobProperties.containsKey(LoadStmt.KEY_ESCAPE)) {
+ analyzedJobProperties.put(LoadStmt.KEY_ESCAPE,
jobProperties.get(LoadStmt.KEY_ESCAPE));
+ }
}
private void checkDataSourceProperties() throws UserException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
index 95434a1fd19..13654509821 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
@@ -141,6 +141,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
.add(LOAD_TO_SINGLE_TABLET)
.add(PARTIAL_COLUMNS)
.add(WORKLOAD_GROUP)
+ .add(LoadStmt.KEY_ENCLOSE)
+ .add(LoadStmt.KEY_ESCAPE)
.build();
private final LabelName labelName;
@@ -178,9 +180,9 @@ public class CreateRoutineLoadStmt extends DdlStmt {
private boolean numAsString = false;
private boolean fuzzyParse = false;
- private String enclose;
+ private byte enclose;
- private String escape;
+ private byte escape;
private long workloadGroupId = -1;
@@ -311,11 +313,11 @@ public class CreateRoutineLoadStmt extends DdlStmt {
return jsonPaths;
}
- public String getEnclose() {
+ public byte getEnclose() {
return enclose;
}
- public String getEscape() {
+ public byte getEscape() {
return escape;
}
@@ -507,14 +509,24 @@ public class CreateRoutineLoadStmt extends DdlStmt {
loadToSingleTablet =
Util.getBooleanPropertyOrDefault(jobProperties.get(LoadStmt.LOAD_TO_SINGLE_TABLET),
RoutineLoadJob.DEFAULT_LOAD_TO_SINGLE_TABLET,
LoadStmt.LOAD_TO_SINGLE_TABLET + " should be a boolean");
- enclose = jobProperties.get(LoadStmt.KEY_ENCLOSE);
- if (enclose != null && enclose.length() != 1) {
- throw new AnalysisException("enclose must be single-char");
+
+ String encloseStr = jobProperties.get(LoadStmt.KEY_ENCLOSE);
+ if (encloseStr != null) {
+ if (encloseStr.length() != 1) {
+ throw new AnalysisException("enclose must be single-char");
+ } else {
+ enclose = encloseStr.getBytes()[0];
+ }
}
- escape = jobProperties.get(LoadStmt.KEY_ESCAPE);
- if (escape != null && escape.length() != 1) {
- throw new AnalysisException("escape must be single-char");
+ String escapeStr = jobProperties.get(LoadStmt.KEY_ESCAPE);
+ if (escapeStr != null) {
+ if (escapeStr.length() != 1) {
+ throw new AnalysisException("enclose must be single-char");
+ } else {
+ escape = escapeStr.getBytes()[0];
+ }
}
+
String inputWorkloadGroupStr = jobProperties.get(WORKLOAD_GROUP);
if (!StringUtils.isEmpty(inputWorkloadGroupStr)) {
this.workloadGroupId = Env.getCurrentEnv().getWorkloadGroupMgr()
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index cc503df5880..a25cd999858 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -424,11 +424,13 @@ public abstract class RoutineLoadJob
} else {
jobProperties.put(PROPS_FUZZY_PARSE, "false");
}
- if (stmt.getEnclose() != null) {
- jobProperties.put(LoadStmt.KEY_ENCLOSE, stmt.getEnclose());
+ if (String.valueOf(stmt.getEnclose()) != null) {
+ this.enclose = stmt.getEnclose();
+ jobProperties.put(LoadStmt.KEY_ENCLOSE,
String.valueOf(stmt.getEnclose()));
}
- if (stmt.getEscape() != null) {
- jobProperties.put(LoadStmt.KEY_ESCAPE, stmt.getEscape());
+ if (String.valueOf(stmt.getEscape()) != null) {
+ this.escape = stmt.getEscape();
+ jobProperties.put(LoadStmt.KEY_ESCAPE,
String.valueOf(stmt.getEscape()));
}
if (stmt.getWorkloadGroupId() > 0) {
jobProperties.put(WORKLOAD_GROUP,
String.valueOf(stmt.getWorkloadGroupId()));
diff --git
a/regression-test/data/load_p0/routine_load/test_routine_load_property.out
b/regression-test/data/load_p0/routine_load/test_routine_load_property.out
new file mode 100644
index 00000000000..62fdfe4fcf3
--- /dev/null
+++ b/regression-test/data/load_p0/routine_load/test_routine_load_property.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql_enclose_and_escape --
+1 ab,ced 2023-07-15 de 2023-07-20T05:48:31 "ghi"
+
+-- !sql_enclose_and_escape_resume --
+1 ab,ced 2023-07-15 de 2023-07-20T05:48:31 "ghi"
+
+-- !sql_enclose_and_escape_multi_table --
+1 ab,ced 2023-07-15 de 2023-07-20T05:48:31 "ghi"
+
diff --git
a/regression-test/suites/load_p0/routine_load/data/test_enclose_and_escape0.csv
b/regression-test/suites/load_p0/routine_load/data/test_enclose_and_escape0.csv
new file mode 100644
index 00000000000..0d6530d5686
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/data/test_enclose_and_escape0.csv
@@ -0,0 +1 @@
+1,eab,cfede,2023-07-15,def,2023-07-20:05:48:31,"ghi"
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/routine_load/data/test_enclose_and_escape1.csv
b/regression-test/suites/load_p0/routine_load/data/test_enclose_and_escape1.csv
new file mode 100644
index 00000000000..8cb2bed0210
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/data/test_enclose_and_escape1.csv
@@ -0,0 +1 @@
+1,eab,gfegdg,2023-07-15,def,2023-07-20:05:48:31,"ghi"
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy
new file mode 100644
index 00000000000..9cc1fa0d2d9
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_property","p0") {
+ // send data to Kafka
+ def kafkaCsvTpoics = [
+ "test_enclose_and_escape0",
+ "test_enclose_and_escape1",
+ ]
+ String enabled = context.config.otherConfigs.get("enableKafkaTest")
+ String kafka_port = context.config.otherConfigs.get("kafka_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ def kafka_broker = "${externalEnvIp}:${kafka_port}"
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ // define kafka
+ def props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // Create kafka producer
+ def producer = new KafkaProducer<>(props)
+
+ for (String kafkaCsvTopic in kafkaCsvTpoics) {
+ def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+ def lines = txt.readLines()
+ lines.each { line ->
+ logger.info("=====${line}========")
+ def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+ producer.send(record)
+ }
+ }
+ }
+
+ // test create routine load job with enclose and escape
+ def tableName = "test_routine_load_with_enclose_and_escape"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ `v1` date NULL,
+ `v2` string NULL,
+ `v3` datetime NULL,
+ `v4` string NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ def jobName = "test_enclose_and_escape"
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName} on ${tableName}
+ COLUMNS TERMINATED BY ","
+ PROPERTIES
+ (
+ "max_batch_interval" = "5",
+ "max_batch_rows" = "300000",
+ "enclose" = "e",
+ "escape" = "f",
+ "max_batch_size" = "209715200"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTpoics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+
+ def count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(1000)
+ count++
+ }
+ qt_sql_enclose_and_escape "select * from ${tableName} order by k1"
+
+ sql "pause routine load for ${jobName}"
+ def res = sql "show routine load for ${jobName}"
+ log.info("routine load job properties:
${res[0][11].toString()}".toString())
+ sql "ALTER ROUTINE LOAD FOR ${jobName} PROPERTIES(\"enclose\" =
\"g\");"
+ sql "resume routine load for ${jobName}"
+ count = 0
+ while (true) {
+ res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(1000)
+ count++
+ }
+ qt_sql_enclose_and_escape_resume "select * from ${tableName} order
by k1"
+ } finally {
+ sql "stop routine load for ${jobName}"
+ }
+
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName}
+ COLUMNS TERMINATED BY ","
+ PROPERTIES
+ (
+ "max_batch_interval" = "5",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200",
+ "enclose" = "e",
+ "escape" = "f"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTpoics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+
+ def count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(1000)
+ count++
+ }
+
+ qt_sql_enclose_and_escape_multi_table "select * from ${tableName}
order by k1"
+ } finally {
+ sql "stop routine load for ${jobName}"
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]