This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 93775369983 branch-3.0: [fix](Export) modify some cases of export
feature #47976 (#48061)
93775369983 is described below
commit 937753699836c3da3a60e21e2811b220d86d9a7d
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Feb 24 11:33:14 2025 +0800
branch-3.0: [fix](Export) modify some cases of export feature #47976
(#48061)
Cherry-picked from #47976
Co-authored-by: Tiewei Fang <[email protected]>
---
.../data/export_p0/test_export_basic.out | Bin 7131 -> 7131 bytes
.../org/apache/doris/regression/suite/Suite.groovy | 56 ++++
.../suites/export_p0/test_export_basic.groovy | 304 +++++++-----------
.../suites/export_p0/test_export_csv.groovy | 208 +++++--------
.../export_p0/test_export_data_consistency.groovy | 69 ++---
.../export_p0/test_export_empty_table.groovy | 142 ++++++---
.../test_export_table_with_label_retry.groovy | 80 ++---
.../suites/export_p0/test_export_view.groovy | 327 ++++++++------------
.../export/test_export_external_table.groovy | 341 ++++++++-------------
9 files changed, 665 insertions(+), 862 deletions(-)
diff --git a/regression-test/data/export_p0/test_export_basic.out
b/regression-test/data/export_p0/test_export_basic.out
index 52aa765fc33..e9614760abd 100644
Binary files a/regression-test/data/export_p0/test_export_basic.out and
b/regression-test/data/export_p0/test_export_basic.out differ
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 111c472a364..384c94c4bfb 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -1054,6 +1054,62 @@ class Suite implements GroovyInterceptable {
Assert.assertEquals(0, code)
}
+ void mkdirRemotePathOnAllBE(String username, String path) {
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+
+ def executeCommand = { String cmd, Boolean mustSuc ->
+ try {
+ staticLogger.info("execute ${cmd}")
+ def proc = new ProcessBuilder("/bin/bash", "-c",
cmd).redirectErrorStream(true).start()
+ int exitcode = proc.waitFor()
+ if (exitcode != 0) {
+ staticLogger.info("exit code: ${exitcode}, output\n:
${proc.text}")
+ if (mustSuc == true) {
+ Assert.assertEquals(0, exitcode)
+ }
+ }
+ } catch (IOException e) {
+ Assert.assertTrue(false, "execute timeout")
+ }
+ }
+
+ ipList.each { beid, ip ->
+ String cmd = "ssh -o StrictHostKeyChecking=no ${username}@${ip}
\"mkdir -p ${path}\""
+ logger.info("Execute: ${cmd}".toString())
+ executeCommand(cmd, false)
+ }
+ }
+
+ void deleteRemotePathOnAllBE(String username, String path) {
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+
+ def executeCommand = { String cmd, Boolean mustSuc ->
+ try {
+ staticLogger.info("execute ${cmd}")
+ def proc = new ProcessBuilder("/bin/bash", "-c",
cmd).redirectErrorStream(true).start()
+ int exitcode = proc.waitFor()
+ if (exitcode != 0) {
+ staticLogger.info("exit code: ${exitcode}, output\n:
${proc.text}")
+ if (mustSuc == true) {
+ Assert.assertEquals(0, exitcode)
+ }
+ }
+ } catch (IOException e) {
+ Assert.assertTrue(false, "execute timeout")
+ }
+ }
+
+ ipList.each { beid, ip ->
+ String cmd = "ssh -o StrictHostKeyChecking=no ${username}@${ip}
\"rm -r ${path}\""
+ logger.info("Execute: ${cmd}".toString())
+ executeCommand(cmd, false)
+ }
+ }
+
String cmd(String cmd, int timeoutSecond = 0) {
var processBuilder = new ProcessBuilder()
processBuilder.command("/bin/bash", "-c", cmd)
diff --git a/regression-test/suites/export_p0/test_export_basic.groovy
b/regression-test/suites/export_p0/test_export_basic.groovy
index 152f1ab4e6e..19261c55e01 100644
--- a/regression-test/suites/export_p0/test_export_basic.groovy
+++ b/regression-test/suites/export_p0/test_export_basic.groovy
@@ -59,7 +59,8 @@ suite("test_export_basic", "p0") {
def table_export_name = "test_export_basic"
def table_load_name = "test_load_basic"
- def outfile_path_prefix = """/tmp/test_export"""
+ def outfile_path_prefix = """/tmp/test_export_basic"""
+ def local_tvf_prefix = "tmp/test_export_basic"
// create table and insert
sql """ DROP TABLE IF EXISTS ${table_export_name} """
@@ -96,29 +97,13 @@ suite("test_export_basic", "p0") {
qt_select_export """ SELECT * FROM ${table_export_name} t ORDER BY id; """
+ def machine_user_name = "root"
def check_path_exists = { dir_path ->
- File path = new File(dir_path)
- if (!path.exists()) {
- assert path.mkdirs()
- } else {
- throw new IllegalStateException("""${dir_path} already exists! """)
- }
- }
-
- def check_file_amounts = { dir_path, amount ->
- File path = new File(dir_path)
- File[] files = path.listFiles()
- assert files.length == amount
+ mkdirRemotePathOnAllBE(machine_user_name, dir_path)
}
def delete_files = { dir_path ->
- File path = new File(dir_path)
- if (path.exists()) {
- for (File f: path.listFiles()) {
- f.delete();
- }
- path.delete();
- }
+ deleteRemotePathOnAllBE(machine_user_name, dir_path)
}
def waiting_export = { the_db, export_label ->
@@ -152,7 +137,7 @@ suite("test_export_basic", "p0") {
// 1. basic test
def uuid = UUID.randomUUID().toString()
- def outFilePath = """${outfile_path_prefix}_${uuid}"""
+ def outFilePath = """${outfile_path_prefix}/${table_export_name}_${uuid}"""
def label = "label_${uuid}"
try {
// check export path
@@ -169,9 +154,6 @@ suite("test_export_basic", "p0") {
);
"""
waiting_export.call(db, label)
-
- // check file amounts
- check_file_amounts.call("${outFilePath}", 1)
// check data correctness
sql """ DROP TABLE IF EXISTS ${table_load_name} """
@@ -184,28 +166,23 @@ suite("test_export_basic", "p0") {
DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
"""
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'column_separator', ','
- set 'columns', 'id, Name, age'
- set 'strict_mode', 'true'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(150, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into ${table_load_name} from local()")
+ sql """
+ insert into ${table_load_name}
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "format" = "csv",
+ "column_separator" = ","
+ );
+ """
+ insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
qt_select_load1 """ SELECT * FROM ${table_load_name} t ORDER BY id; """
@@ -215,9 +192,9 @@ suite("test_export_basic", "p0") {
delete_files.call("${outFilePath}")
}
- // 2. test patition1
+ // 2. test partition1
uuid = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid}"""
+ outFilePath = """${outfile_path_prefix}/${table_export_name}_${uuid}"""
label = "label_${uuid}"
try {
// check export path
@@ -235,9 +212,6 @@ suite("test_export_basic", "p0") {
);
"""
waiting_export.call(db, label)
-
- // check file amounts
- check_file_amounts.call("${outFilePath}", 1)
// check data correctness
sql """ DROP TABLE IF EXISTS ${table_load_name} """
@@ -250,28 +224,23 @@ suite("test_export_basic", "p0") {
DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
"""
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'column_separator', ','
- set 'columns', 'id, Name, age'
- set 'strict_mode', 'true'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(19, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into ${table_load_name} from local()")
+ sql """
+ insert into ${table_load_name}
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "format" = "csv",
+ "column_separator" = ","
+ );
+ """
+ insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
qt_select_load2 """ SELECT * FROM ${table_load_name} t ORDER BY id; """
@@ -281,9 +250,9 @@ suite("test_export_basic", "p0") {
delete_files.call("${outFilePath}")
}
- // 3. test patition2
+ // 3. test partition2
uuid = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid}"""
+ outFilePath = """${outfile_path_prefix}/${table_export_name}_${uuid}"""
label = "label_${uuid}"
try {
// check export path
@@ -301,9 +270,6 @@ suite("test_export_basic", "p0") {
);
"""
waiting_export.call(db, label)
-
- // check file amounts
- check_file_amounts.call("${outFilePath}", 1)
// check data correctness
sql """ DROP TABLE IF EXISTS ${table_load_name} """
@@ -316,28 +282,23 @@ suite("test_export_basic", "p0") {
DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
"""
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'column_separator', ','
- set 'columns', 'id, Name, age'
- set 'strict_mode', 'true'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(50, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into ${table_load_name} from local()")
+ sql """
+ insert into ${table_load_name}
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "format" = "csv",
+ "column_separator" = ","
+ );
+ """
+ insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
qt_select_load3 """ SELECT * FROM ${table_load_name} t ORDER BY id; """
@@ -347,9 +308,9 @@ suite("test_export_basic", "p0") {
delete_files.call("${outFilePath}")
}
- // 4. test patition3 and where clause
+ // 4. test partition3 and where clause
uuid = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid}"""
+ outFilePath = """${outfile_path_prefix}/${table_export_name}_${uuid}"""
label = "label_${uuid}"
try {
// check export path
@@ -367,9 +328,6 @@ suite("test_export_basic", "p0") {
);
"""
waiting_export.call(db, label)
-
- // check file amounts
- check_file_amounts.call("${outFilePath}", 1)
// check data correctness
sql """ DROP TABLE IF EXISTS ${table_load_name} """
@@ -382,31 +340,26 @@ suite("test_export_basic", "p0") {
DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
"""
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'column_separator', ','
- set 'columns', 'id, Name, age'
- set 'strict_mode', 'true'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(50, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into ${table_load_name} from local()")
+ sql """
+ insert into ${table_load_name}
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "format" = "csv",
+ "column_separator" = ","
+ );
+ """
+ insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
- qt_select_load3 """ SELECT * FROM ${table_load_name} t ORDER BY id; """
+ qt_select_load4 """ SELECT * FROM ${table_load_name} t ORDER BY id; """
} finally {
try_sql("DROP TABLE IF EXISTS ${table_load_name}")
@@ -415,7 +368,7 @@ suite("test_export_basic", "p0") {
// 5. test order by and limit clause
def uuid1 = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid1}"""
+ outFilePath = """${outfile_path_prefix}/${table_export_name}_${uuid}"""
def label1 = "label_${uuid1}"
def uuid2 = UUID.randomUUID().toString()
def label2 = "label_${uuid2}"
@@ -447,9 +400,6 @@ suite("test_export_basic", "p0") {
waiting_export.call(db, label1)
waiting_export.call(db, label2)
- // check file amounts
- check_file_amounts.call("${outFilePath}", 2)
-
// check show export correctness
def res = sql """ show export where STATE = "FINISHED" order by JobId
desc limit 2"""
assertTrue(res[0][0] > res[1][0])
@@ -461,7 +411,7 @@ suite("test_export_basic", "p0") {
// 6. test columns property
uuid = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid}"""
+ outFilePath = """${outfile_path_prefix}/${table_export_name}_${uuid}"""
label = "label_${uuid}"
try {
// check export path
@@ -480,9 +430,6 @@ suite("test_export_basic", "p0") {
);
"""
waiting_export.call(db, label)
-
- // check file amounts
- check_file_amounts.call("${outFilePath}", 1)
// check data correctness
sql """ DROP TABLE IF EXISTS ${table_load_name} """
@@ -494,28 +441,23 @@ suite("test_export_basic", "p0") {
DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
"""
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'column_separator', ','
- set 'columns', 'id, Name'
- set 'strict_mode', 'true'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(67, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into ${table_load_name} from local()")
+ sql """
+ insert into ${table_load_name} (id, Name)
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "format" = "csv",
+ "column_separator" = ","
+ );
+ """
+ insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
qt_select_load6 """ SELECT * FROM ${table_load_name} t ORDER BY id; """
@@ -527,7 +469,7 @@ suite("test_export_basic", "p0") {
// 7. test columns property 2
uuid = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid}"""
+ outFilePath = """${outfile_path_prefix}/${table_export_name}_${uuid}"""
label = "label_${uuid}"
try {
// check export path
@@ -546,9 +488,6 @@ suite("test_export_basic", "p0") {
);
"""
waiting_export.call(db, label)
-
- // check file amounts
- check_file_amounts.call("${outFilePath}", 1)
// check data correctness
sql """ DROP TABLE IF EXISTS ${table_load_name} """
@@ -559,28 +498,23 @@ suite("test_export_basic", "p0") {
DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
"""
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'column_separator', ','
- set 'columns', 'id'
- set 'strict_mode', 'true'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(67, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into ${table_load_name} from local()")
+ sql """
+ insert into ${table_load_name}
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "format" = "csv",
+ "column_separator" = ","
+ );
+ """
+ insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
qt_select_load7 """ SELECT * FROM ${table_load_name} t ORDER BY id; """
@@ -599,7 +533,7 @@ suite("test_export_basic", "p0") {
// 1. first export
uuid = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid}"""
+ outFilePath = """${outfile_path_prefix}/${table_export_name}_${uuid}"""
label = "label_${uuid}"
// check export path
check_path_exists.call("${outFilePath}")
diff --git a/regression-test/suites/export_p0/test_export_csv.groovy
b/regression-test/suites/export_p0/test_export_csv.groovy
index 93e894a4f65..09d06996b7f 100644
--- a/regression-test/suites/export_p0/test_export_csv.groovy
+++ b/regression-test/suites/export_p0/test_export_csv.groovy
@@ -56,7 +56,8 @@ suite("test_export_csv", "p0") {
def table_export_name = "test_export_csv"
def table_load_name = "test_load_csv"
- def outfile_path_prefix = """/tmp/test_export"""
+ def outfile_path_prefix = """/tmp/test_export_csv"""
+ def local_tvf_prefix = "tmp/test_export_csv"
// create table and insert
sql """ DROP TABLE IF EXISTS ${table_export_name} """
@@ -98,30 +99,13 @@ suite("test_export_csv", "p0") {
logger.info("insert result: " + insert_res.toString())
qt_select_export1 """ SELECT * FROM ${table_export_name} t ORDER BY
user_id; """
-
+ def machine_user_name = "root"
def check_path_exists = { dir_path ->
- File path = new File(dir_path)
- if (!path.exists()) {
- assert path.mkdirs()
- } else {
- throw new IllegalStateException("""${dir_path} already exists! """)
- }
- }
-
- def check_file_amounts = { dir_path, amount ->
- File path = new File(dir_path)
- File[] files = path.listFiles()
- assert files.length == amount
+ mkdirRemotePathOnAllBE(machine_user_name, dir_path)
}
def delete_files = { dir_path ->
- File path = new File(dir_path)
- if (path.exists()) {
- for (File f: path.listFiles()) {
- f.delete();
- }
- path.delete();
- }
+ deleteRemotePathOnAllBE(machine_user_name, dir_path)
}
def waiting_export = { export_label ->
@@ -140,7 +124,7 @@ suite("test_export_csv", "p0") {
// 1. test more type
def uuid = UUID.randomUUID().toString()
- def outFilePath = """${outfile_path_prefix}_${uuid}"""
+ def outFilePath = "${outfile_path_prefix}" +
"/${table_export_name}_${uuid}"
def label = "label_${uuid}"
try {
// check export path
@@ -157,9 +141,6 @@ suite("test_export_csv", "p0") {
"""
waiting_export.call(label)
- // check file amounts
- check_file_amounts.call("${outFilePath}", 1)
-
// check data correctness
sql """ DROP TABLE IF EXISTS ${table_load_name} """
sql """
@@ -184,28 +165,22 @@ suite("test_export_csv", "p0") {
DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
"""
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'column_separator', ','
- set 'columns', 'user_id, date, datetime, city, age, sex, bool_col,
int_col, bigint_col, largeint_col, float_col, double_col, char_col,
decimal_col, ipv4_col, ipv6_col'
- set 'strict_mode', 'true'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(100, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into ${table_load_name} from local()")
+ sql """
+ insert into ${table_load_name}
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "format" = "csv",
+ "column_separator" = ",");
+ """
+ insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
qt_select_load1 """ SELECT * FROM ${table_load_name} t ORDER BY
user_id; """
@@ -218,7 +193,7 @@ suite("test_export_csv", "p0") {
// 2. test csv column_separator and line_delimiter
uuid = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid}"""
+ outFilePath = "${outfile_path_prefix}" + "/${table_export_name}_${uuid}"
label = "label_${uuid}"
try {
// check export path
@@ -236,9 +211,6 @@ suite("test_export_csv", "p0") {
"""
waiting_export.call(label)
- // check file amounts
- check_file_amounts.call("${outFilePath}", 1)
-
// check data correctness
sql """ DROP TABLE IF EXISTS ${table_load_name} """
sql """
@@ -263,29 +235,23 @@ suite("test_export_csv", "p0") {
DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
"""
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'column_separator', 'ab'
- set 'line_delimiter', 'cc'
- set 'columns', 'user_id, date, datetime, city, age, sex, bool_col,
int_col, bigint_col, largeint_col, float_col, double_col, char_col,
decimal_col, ipv4_col, ipv6_col'
- set 'strict_mode', 'true'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(10, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into ${table_load_name} from local()")
+ sql """
+ insert into ${table_load_name}
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "format" = "csv",
+ "column_separator" = "ab",
+ "line_delimiter" = "cc");
+ """
+ insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
qt_select_load2 """ SELECT * FROM ${table_load_name} t ORDER BY
user_id; """
@@ -297,7 +263,7 @@ suite("test_export_csv", "p0") {
// 3. test csv_with_names
uuid = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid}"""
+ outFilePath = "${outfile_path_prefix}" + "/${table_export_name}_${uuid}"
label = "label_${uuid}"
try {
// check export path
@@ -315,9 +281,6 @@ suite("test_export_csv", "p0") {
"""
waiting_export.call(label)
- // check file amounts
- check_file_amounts.call("${outFilePath}", 1)
-
// check data correctness
sql """ DROP TABLE IF EXISTS ${table_load_name} """
sql """
@@ -342,30 +305,24 @@ suite("test_export_csv", "p0") {
DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
"""
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'column_separator', 'ab'
- set 'line_delimiter', 'cc'
- set 'columns', 'user_id, date, datetime, city, age, sex, bool_col,
int_col, bigint_col, largeint_col, float_col, double_col, char_col,
decimal_col, ipv4_col, ipv6_col'
- set 'strict_mode', 'true'
- set 'format', 'csv_with_names'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(10, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into ${table_load_name} from local()")
+ sql """
+ insert into ${table_load_name}
+ select *
+ from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "format" = "csv_with_names",
+ "line_delimiter" = "cc",
+ "column_separator" = "ab");
+ """
+ insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
qt_select_load3 """ SELECT * FROM ${table_load_name} t ORDER BY
user_id; """
@@ -377,7 +334,7 @@ suite("test_export_csv", "p0") {
// 4. test csv_with_names_and_types
uuid = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid}"""
+ outFilePath = "${outfile_path_prefix}" + "/${table_export_name}_${uuid}"
label = "label_${uuid}"
try {
// check export path
@@ -395,9 +352,6 @@ suite("test_export_csv", "p0") {
"""
waiting_export.call(label)
- // check file amounts
- check_file_amounts.call("${outFilePath}", 1)
-
// check data correctness
sql """ DROP TABLE IF EXISTS ${table_load_name} """
sql """
@@ -422,30 +376,24 @@ suite("test_export_csv", "p0") {
DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
"""
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'column_separator', 'ab'
- set 'line_delimiter', 'cc'
- set 'columns', 'user_id, date, datetime, city, age, sex, bool_col,
int_col, bigint_col, largeint_col, float_col, double_col, char_col,
decimal_col, ipv4_col, ipv6_col'
- set 'strict_mode', 'true'
- set 'format', 'csv_with_names_and_types'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(10, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into ${table_load_name} from local()")
+ sql """
+ insert into ${table_load_name}
+ select *
+ from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "format" = "csv_with_names_and_types",
+ "line_delimiter" = "cc",
+ "column_separator" = "ab");
+ """
+ insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
qt_select_load4 """ SELECT * FROM ${table_load_name} t ORDER BY
user_id; """
diff --git
a/regression-test/suites/export_p0/test_export_data_consistency.groovy
b/regression-test/suites/export_p0/test_export_data_consistency.groovy
index cd19b082802..e1d4ba16385 100644
--- a/regression-test/suites/export_p0/test_export_data_consistency.groovy
+++ b/regression-test/suites/export_p0/test_export_data_consistency.groovy
@@ -59,6 +59,7 @@ suite("test_export_data_consistency", "p0") {
def table_export_name = "test_export_data_consistency"
def table_load_name = "test_load_data_consistency"
def outfile_path_prefix = """/tmp/test_export_data_consistency"""
+ def local_tvf_prefix = "tmp/test_export_data_consistency"
// create table and insert
sql """ DROP TABLE IF EXISTS ${table_export_name} """
@@ -96,29 +97,13 @@ suite("test_export_data_consistency", "p0") {
qt_select_export """ SELECT * FROM ${table_export_name} t ORDER BY id; """
+ def machine_user_name = "root"
def check_path_exists = { dir_path ->
- File path = new File(dir_path)
- if (!path.exists()) {
- assert path.mkdirs()
- } else {
- throw new IllegalStateException("""${dir_path} already exists! """)
- }
- }
-
- def check_file_amounts = { dir_path, amount ->
- File path = new File(dir_path)
- File[] files = path.listFiles()
- assert files.length == amount
+ mkdirRemotePathOnAllBE(machine_user_name, dir_path)
}
def delete_files = { dir_path ->
- File path = new File(dir_path)
- if (path.exists()) {
- for (File f: path.listFiles()) {
- f.delete();
- }
- path.delete();
- }
+ deleteRemotePathOnAllBE(machine_user_name, dir_path)
}
def waiting_export = { the_db, export_label ->
@@ -137,7 +122,7 @@ suite("test_export_data_consistency", "p0") {
// 1. basic test
def uuid = UUID.randomUUID().toString()
- def outFilePath = """${outfile_path_prefix}_${uuid}"""
+ def outFilePath = "${outfile_path_prefix}" +
"/${table_export_name}_${uuid}"
def label = "label_${uuid}"
try {
// check export path
@@ -168,9 +153,6 @@ suite("test_export_data_consistency", "p0") {
// wait export
waiting_export.call(db, label)
- // check file amounts
- check_file_amounts.call("${outFilePath}", 3)
-
// check data correctness
sql """ DROP TABLE IF EXISTS ${table_load_name} """
sql """
@@ -182,29 +164,22 @@ suite("test_export_data_consistency", "p0") {
DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
"""
- File[] files = new File("${outFilePath}").listFiles()
- for (exportLoadFile in files) {
- String file_path = exportLoadFile.getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'column_separator', ','
- set 'columns', 'id, name, age'
- set 'strict_mode', 'true'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(0, json.NumberFilteredRows)
- }
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into ${table_load_name} from local()")
+ sql """
+ insert into ${table_load_name}
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "format" = "csv",
+ "column_separator" = ",");
+ """
+ insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
// The partition ranges are:
@@ -212,7 +187,7 @@ suite("test_export_data_consistency", "p0") {
// The export task should keep partition consistency.
def result = sql """ SELECT * FROM ${table_load_name} t WHERE id in
(10,15,30,40,80,90) ORDER BY id; """
logger.info("result ${result}")
- assert result.size == 6
+ assert result.size() == 6
if (result[0][1] == 'test') {
assert result[1][1] == 'test'
} else {
diff --git a/regression-test/suites/export_p0/test_export_empty_table.groovy
b/regression-test/suites/export_p0/test_export_empty_table.groovy
index 584c65d73bc..bd2b4ad09d1 100644
--- a/regression-test/suites/export_p0/test_export_empty_table.groovy
+++ b/regression-test/suites/export_p0/test_export_empty_table.groovy
@@ -56,8 +56,8 @@ suite("test_export_empty_table", "p0") {
}
def table_export_name = "test_export_empty_table"
- def table_load_name = "test_load_empty_table"
- def outfile_path_prefix = """/tmp/test_export"""
+ def outfile_path_prefix = """/tmp/test_export_empty_table"""
+ def local_tvf_prefix = "tmp/test_export_empty_table"
// create table
sql """ DROP TABLE IF EXISTS ${table_export_name} """
@@ -84,29 +84,13 @@ suite("test_export_empty_table", "p0") {
qt_select_export1 """ SELECT * FROM ${table_export_name} t ORDER BY
user_id; """
+ def machine_user_name = "root"
def check_path_exists = { dir_path ->
- File path = new File(dir_path)
- if (!path.exists()) {
- assert path.mkdirs()
- } else {
- throw new IllegalStateException("""${dir_path} already exists! """)
- }
- }
-
- def check_file_amounts = { dir_path, amount ->
- File path = new File(dir_path)
- File[] files = path.listFiles()
- assert files.length == amount
+ mkdirRemotePathOnAllBE(machine_user_name, dir_path)
}
def delete_files = { dir_path ->
- File path = new File(dir_path)
- if (path.exists()) {
- for (File f: path.listFiles()) {
- f.delete();
- }
- path.delete();
- }
+ deleteRemotePathOnAllBE(machine_user_name, dir_path)
}
def waiting_export = { export_label ->
@@ -125,7 +109,7 @@ suite("test_export_empty_table", "p0") {
// 1. test csv
def uuid = UUID.randomUUID().toString()
- def outFilePath = """${outfile_path_prefix}_${uuid}"""
+ def outFilePath = "${outfile_path_prefix}/${table_export_name}_${uuid}"
def label = "label_${uuid}"
try {
// check export path
@@ -142,8 +126,25 @@ suite("test_export_empty_table", "p0") {
"""
waiting_export.call(label)
- // check file amounts
- check_file_amounts.call("${outFilePath}", 0)
+ // use local() tvf to check
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ test {
+ sql """
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "format" = "csv",
+ "column_separator" = ",");
+ """
+
+ // because we export the empty data,
+ // there is no files.
+ exception "get file list from backend failed."
+ }
+ }
} finally {
delete_files.call("${outFilePath}")
}
@@ -151,7 +152,7 @@ suite("test_export_empty_table", "p0") {
// 2. test parquet
uuid = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid}"""
+ outFilePath = "${outfile_path_prefix}/${table_export_name}_${uuid}"
label = "label_${uuid}"
try {
// check export path
@@ -167,15 +168,32 @@ suite("test_export_empty_table", "p0") {
"""
waiting_export.call(label)
- // check file amounts
- check_file_amounts.call("${outFilePath}", 0)
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ test {
+ sql """
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "format" = "csv",
+ "column_separator" = ",");
+ """
+
+ // because we export the empty data,
+ // there is no files.
+ exception "get file list from backend failed."
+ }
+ }
} finally {
delete_files.call("${outFilePath}")
}
// 3. test orc
uuid = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid}"""
+ outFilePath = "${outfile_path_prefix}/${table_export_name}_${uuid}"
label = "label_${uuid}"
try {
// check export path
@@ -191,15 +209,32 @@ suite("test_export_empty_table", "p0") {
"""
waiting_export.call(label)
- // check file amounts
- check_file_amounts.call("${outFilePath}", 0)
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ test {
+ sql """
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "format" = "csv",
+ "column_separator" = ",");
+ """
+
+ // because we export the empty data,
+ // there is no files.
+ exception "get file list from backend failed."
+ }
+ }
} finally {
delete_files.call("${outFilePath}")
}
// 4. test csv_with_names
uuid = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid}"""
+ outFilePath = "${outfile_path_prefix}/${table_export_name}_${uuid}"
label = "label_${uuid}"
try {
// check export path
@@ -216,17 +251,33 @@ suite("test_export_empty_table", "p0") {
"""
waiting_export.call(label)
- // check file amounts
- check_file_amounts.call("${outFilePath}", 0)
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ test {
+ sql """
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "format" = "csv",
+ "column_separator" = ",");
+ """
+
+ // because we export the empty data,
+ // there is no files.
+ exception "get file list from backend failed."
+ }
+ }
} finally {
- try_sql("DROP TABLE IF EXISTS ${table_load_name}")
delete_files.call("${outFilePath}")
}
// 5. test csv_with_names_and_types
uuid = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid}"""
+ outFilePath = "${outfile_path_prefix}/${table_export_name}_${uuid}"
label = "label_${uuid}"
try {
// check export path
@@ -243,8 +294,25 @@ suite("test_export_empty_table", "p0") {
"""
waiting_export.call(label)
- // check file amounts
- check_file_amounts.call("${outFilePath}", 0)
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ test {
+ sql """
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "format" = "csv",
+ "column_separator" = ",");
+ """
+
+ // because we export the empty data,
+ // there is no files.
+ exception "get file list from backend failed."
+ }
+ }
} finally {
delete_files.call("${outFilePath}")
}
diff --git
a/regression-test/suites/export_p0/test_export_table_with_label_retry.groovy
b/regression-test/suites/export_p0/test_export_table_with_label_retry.groovy
index f4c64eb4e94..e6ac6684b5d 100644
--- a/regression-test/suites/export_p0/test_export_table_with_label_retry.groovy
+++ b/regression-test/suites/export_p0/test_export_table_with_label_retry.groovy
@@ -54,10 +54,11 @@ suite("test_export_table_with_label_retry", "p0") {
return
}
- def table_export_name = "test_export_label"
- def table_load_name = "test_load_label"
- def wrong_outfile_path_prefix = """tmp/test_export"""
- def outfile_path_prefix = """/tmp/test_export"""
+ def table_export_name = "test_export_table_with_label_retry"
+ def table_load_name = "test_load_table_with_label_retry"
+ def wrong_outfile_path_prefix =
"""mnt/disk2/ftw/projects/doris/output/be"""
+ def outfile_path_prefix = """/tmp/test_export_table_with_label_retry"""
+ def local_tvf_prefix = "tmp/test_export_table_with_label_retry"
// create table and insert
sql """ DROP TABLE IF EXISTS ${table_export_name} """
@@ -99,30 +100,13 @@ suite("test_export_table_with_label_retry", "p0") {
logger.info("insert result: " + insert_res.toString())
qt_select_export1 """ SELECT * FROM ${table_export_name} t ORDER BY
user_id; """
-
+ def machine_user_name = "root"
def check_path_exists = { dir_path ->
- File path = new File(dir_path)
- if (!path.exists()) {
- assert path.mkdirs()
- } else {
- throw new IllegalStateException("""${dir_path} already exists! """)
- }
- }
-
- def check_file_amounts = { dir_path, amount ->
- File path = new File(dir_path)
- File[] files = path.listFiles()
- assert files.length == amount
+ mkdirRemotePathOnAllBE(machine_user_name, dir_path)
}
def delete_files = { dir_path ->
- File path = new File(dir_path)
- if (path.exists()) {
- for (File f: path.listFiles()) {
- f.delete();
- }
- path.delete();
- }
+ deleteRemotePathOnAllBE(machine_user_name, dir_path)
}
def waiting_export_expect_failed = { export_label ->
@@ -155,8 +139,8 @@ suite("test_export_table_with_label_retry", "p0") {
}
def uuid = UUID.randomUUID().toString()
- def outFilePath = """${outfile_path_prefix}_${uuid}"""
- def wrongFilePath = """${wrong_outfile_path_prefix}_${uuid}"""
+ def outFilePath = "${outfile_path_prefix}" +
"/${table_export_name}_${uuid}"
+ def wrongFilePath = "${wrong_outfile_path_prefix}" +
"/${table_export_name}_${uuid}"
def label = "label_${uuid}"
try {
// check export path
@@ -173,9 +157,6 @@ suite("test_export_table_with_label_retry", "p0") {
"""
waiting_export_expect_failed.call(label)
- // check file amounts
- check_file_amounts.call("${outFilePath}", 0)
-
// exec right export with same label again
sql """
EXPORT TABLE ${table_export_name} TO "file://${outFilePath}/"
@@ -188,9 +169,6 @@ suite("test_export_table_with_label_retry", "p0") {
waiting_export_expect_success.call(label)
- // check file amounts
- check_file_amounts.call("${outFilePath}", 1)
-
// check data correctness
sql """ DROP TABLE IF EXISTS ${table_load_name} """
sql """
@@ -215,28 +193,22 @@ suite("test_export_table_with_label_retry", "p0") {
DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
"""
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'column_separator', ','
- set 'columns', 'user_id, date, datetime, city, age, sex, bool_col,
int_col, bigint_col, largeint_col, float_col, double_col, char_col,
decimal_col, ipv4_col, ipv6_col'
- set 'strict_mode', 'true'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(100, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into ${table_load_name} from local()")
+ sql """
+ insert into ${table_load_name}
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "column_separator" = ",",
+ "format" = "csv");
+ """
+ insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
qt_select_load1 """ SELECT * FROM ${table_load_name} t ORDER BY
user_id; """
diff --git a/regression-test/suites/export_p0/test_export_view.groovy
b/regression-test/suites/export_p0/test_export_view.groovy
index dfd020b02fa..0a7f8245a11 100644
--- a/regression-test/suites/export_p0/test_export_view.groovy
+++ b/regression-test/suites/export_p0/test_export_view.groovy
@@ -54,27 +54,12 @@ suite("test_export_view", "p0") {
return
}
+ def machine_user_name = "root"
def check_path_exists = { dir_path ->
- File path = new File(dir_path)
- if (!path.exists()) {
- assert path.mkdirs()
- } else {
- throw new IllegalStateException("""${dir_path} already exists! """)
- }
- }
- def check_file_amounts = { dir_path, amount ->
- File path = new File(dir_path)
- File[] files = path.listFiles()
- assert files.length == amount
+ mkdirRemotePathOnAllBE(machine_user_name, dir_path)
}
def delete_files = { dir_path ->
- File path = new File(dir_path)
- if (path.exists()) {
- for (File f: path.listFiles()) {
- f.delete();
- }
- path.delete();
- }
+ deleteRemotePathOnAllBE(machine_user_name, dir_path)
}
def waiting_export = { export_label ->
while (true) {
@@ -106,7 +91,8 @@ suite("test_export_view", "p0") {
def table_export_name = "test_export_base_table"
def table_export_view_name = "test_export_view_table"
def table_load_name = "test_load_view_basic"
- def outfile_path_prefix = """/tmp/test_export"""
+ def outfile_path_prefix = """/tmp/test_export_view"""
+ def local_tvf_prefix = "tmp/test_export_view"
// create table and insert
sql """ DROP TABLE IF EXISTS ${table_export_name} """
@@ -175,7 +161,7 @@ suite("test_export_view", "p0") {
// 1. basic test
def uuid = UUID.randomUUID().toString()
- def outFilePath = """${outfile_path_prefix}_${uuid}"""
+ def outFilePath =
"${outfile_path_prefix}/${table_export_view_name}_${uuid}"
def label = "label_${uuid}"
try {
// check export path
@@ -192,33 +178,27 @@ suite("test_export_view", "p0") {
"""
waiting_export.call(label)
- // check file amounts
- check_file_amounts.call("${outFilePath}", 1)
// check data correctness
create_load_table(table_load_name)
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'column_separator', ','
- set 'strict_mode', 'true'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(5, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into ${table_load_name} from local()")
+ sql """
+ insert into ${table_load_name}
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_view_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "format" = "csv",
+ "column_separator" = ","
+ );
+ """
+ insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
order_qt_select_load1 """ SELECT * FROM ${table_load_name} t; """
@@ -230,7 +210,7 @@ suite("test_export_view", "p0") {
// 2. test csv_with_names
uuid = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid}"""
+ outFilePath = "${outfile_path_prefix}" +
"/${table_export_view_name}_${uuid}"
label = "label_${uuid}"
try {
// check export path
@@ -249,34 +229,27 @@ suite("test_export_view", "p0") {
"""
waiting_export.call(label)
- // check file amounts
- check_file_amounts.call("${outFilePath}", 1)
// check data correctness
create_load_table(table_load_name)
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'column_separator', ','
- set 'strict_mode', 'true'
- set 'format', 'csv_with_names'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(5, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into ${table_load_name} from local()")
+ sql """
+ insert into ${table_load_name}
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_view_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "column_separator" = ",",
+ "format" = "csv_with_names"
+ );
+ """
+ insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
order_qt_select_load2 """ SELECT * FROM ${table_load_name} t; """
@@ -289,7 +262,7 @@ suite("test_export_view", "p0") {
// 3. test where clause
uuid = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid}"""
+ outFilePath = "${outfile_path_prefix}" +
"/${table_export_view_name}_${uuid}"
label = "label_${uuid}"
try {
// check export path
@@ -309,34 +282,27 @@ suite("test_export_view", "p0") {
"""
waiting_export.call(label)
- // check file amounts
- check_file_amounts.call("${outFilePath}", 1)
// check data correctness
create_load_table(table_load_name)
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'column_separator', ','
- set 'strict_mode', 'true'
- set 'format', 'csv_with_names'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(2, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into ${table_load_name} from local()")
+ sql """
+ insert into ${table_load_name}
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_view_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "column_separator" = ",",
+ "format" = "csv_with_names"
+ );
+ """
+ insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
order_qt_select_load3 """ SELECT * FROM ${table_load_name}; """
@@ -349,7 +315,7 @@ suite("test_export_view", "p0") {
// 4. test where clause and columns property
uuid = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid}"""
+ outFilePath = "${outfile_path_prefix}" +
"/${table_export_view_name}_${uuid}"
label = "label_${uuid}"
try {
// check export path
@@ -371,35 +337,26 @@ suite("test_export_view", "p0") {
waiting_export.call(label)
- // check file amounts
- check_file_amounts.call("${outFilePath}", 1)
-
// check data correctness
create_load_table(table_load_name)
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'column_separator', ','
- set 'strict_mode', 'true'
- set 'columns', 'k3, s1, k1'
- set 'format', 'csv_with_names'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(2, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into ${table_load_name} from local()")
+ sql """
+ insert into ${table_load_name}
+ select s1,k1, null as k2,k3 from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_view_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "column_separator" = ",",
+ "format" = "csv_with_names"
+ );
+ """
+ insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
order_qt_select_load4 """ SELECT * FROM ${table_load_name} t; """
@@ -412,7 +369,7 @@ suite("test_export_view", "p0") {
// 5. test csv_with_names_and_types
uuid = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid}"""
+ outFilePath = "${outfile_path_prefix}" +
"/${table_export_view_name}_${uuid}"
label = "label_${uuid}"
try {
// check export path
@@ -431,34 +388,26 @@ suite("test_export_view", "p0") {
"""
waiting_export.call(label)
- // check file amounts
- check_file_amounts.call("${outFilePath}", 1)
-
// check data correctness
create_load_table(table_load_name)
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'strict_mode', 'true'
- set 'format', 'csv_with_names_and_types'
- set 'column_separator', ','
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(5, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into ${table_load_name} from local()")
+ sql """
+ insert into ${table_load_name}
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_view_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "column_separator" = ",",
+ "format" = "csv_with_names_and_types"
+ );
+ """
+ insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
order_qt_select_load5 """ SELECT * FROM ${table_load_name} t; """
@@ -471,7 +420,7 @@ suite("test_export_view", "p0") {
// 6. test orc type
uuid = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid}"""
+ outFilePath = "${outfile_path_prefix}" +
"/${table_export_view_name}_${uuid}"
label = "label_${uuid}"
try {
// check export path
@@ -488,33 +437,25 @@ suite("test_export_view", "p0") {
"""
waiting_export.call(label)
- // check file amounts
- check_file_amounts.call("${outFilePath}", 1)
-
// check data correctness
create_load_table(table_load_name)
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'strict_mode', 'true'
- set 'format', 'orc'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(5, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into ${table_load_name} from local()")
+ sql """
+ insert into ${table_load_name}
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_view_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "format" = "orc"
+ );
+ """
+ insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
order_qt_select_load6 """ SELECT * FROM ${table_load_name} t; """
@@ -527,7 +468,7 @@ suite("test_export_view", "p0") {
// 8. test orc type, where clause and columns property
uuid = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid}"""
+ outFilePath = "${outfile_path_prefix}" +
"/${table_export_view_name}_${uuid}"
label = "label_${uuid}"
try {
// check export path
@@ -547,35 +488,25 @@ suite("test_export_view", "p0") {
"""
waiting_export.call(label)
-
- // check file amounts
- check_file_amounts.call("${outFilePath}", 1)
-
// check data correctness
create_load_table(table_load_name)
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'strict_mode', 'true'
- set 'columns', 'k3, s1, k1'
- set 'format', 'orc'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(2, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into ${table_load_name} from local()")
+ sql """
+ insert into ${table_load_name}
+ select s1,k1, null as k2,k3 from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_view_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "format" = "orc"
+ );
+ """
+ insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
order_qt_select_load8 """ SELECT * FROM ${table_load_name} t; """
diff --git
a/regression-test/suites/external_table_p0/export/test_export_external_table.groovy
b/regression-test/suites/external_table_p0/export/test_export_external_table.groovy
index 8125db28bf1..2b2d41e2b4d 100644
---
a/regression-test/suites/external_table_p0/export/test_export_external_table.groovy
+++
b/regression-test/suites/external_table_p0/export/test_export_external_table.groovy
@@ -54,37 +54,12 @@ suite("test_export_external_table",
"p0,external,mysql,external_docker,external_
return
}
+ def machine_user_name = "root"
def check_path_exists = { dir_path ->
- List<List<Object>> backends = sql """ show backends """
- assertTrue(backends.size() > 0)
- File path = new File(dir_path)
- if (!path.exists()) {
- assert path.mkdirs()
- } else {
- throw new IllegalStateException("""${dir_path} already exists! """)
- }
- if (backends.size() > 1) {
- for (List<Object> backend : backends) {
- def be_host = backend[1]
- def cmd="""mkdir -p ${dir_path}"""
- sshExec("root", be_host, cmd.toString())
- }
- }
-
- }
- def check_file_amounts = { dir_path, amount ->
- File path = new File(dir_path)
- File[] files = path.listFiles()
- assert files.length == amount
+ mkdirRemotePathOnAllBE(machine_user_name, dir_path)
}
def delete_files = { dir_path ->
- File path = new File(dir_path)
- if (path.exists()) {
- for (File f: path.listFiles()) {
- f.delete();
- }
- path.delete();
- }
+ deleteRemotePathOnAllBE(machine_user_name, dir_path)
}
def waiting_export = { ctlName, dbName, export_label ->
while (true) {
@@ -99,12 +74,12 @@ suite("test_export_external_table",
"p0,external,mysql,external_docker,external_
}
}
}
-
// this table name must be `test1`, because this is an external table.
def table_export_name = "test1"
def table_load_name = "test_load_external__basic"
- def outfile_path_prefix = """/tmp/test_export"""
+ def outfile_path_prefix = """/tmp/test_export_external_table"""
+ def local_tvf_prefix = "tmp/test_export_external_table"
String enabled = context.config.otherConfigs.get("enableJdbcTest")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
@@ -163,7 +138,7 @@ suite("test_export_external_table",
"p0,external,mysql,external_docker,external_
// 1. basic test
def uuid = UUID.randomUUID().toString()
- def outFilePath = """${outfile_path_prefix}_${uuid}"""
+ def outFilePath = "${outfile_path_prefix}" +
"/${table_export_name}_${uuid}"
def label = "label_${uuid}"
try {
// check export path
@@ -180,34 +155,27 @@ suite("test_export_external_table",
"p0,external,mysql,external_docker,external_
);
"""
waiting_export.call(catalog_name, ex_db_name, label)
-
- // check file amounts
- check_file_amounts.call("${outFilePath}", 1)
// check data correctness
create_load_table(table_load_name)
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'column_separator', ','
- set 'strict_mode', 'true'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(100, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into
internal.${internal_db_name}.${table_load_name} from local()")
+ sql """
+ insert into
internal.${internal_db_name}.${table_load_name}
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "format" = "csv",
+ "column_separator" = ","
+ );
+ """
+ def insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
order_qt_select_load1 """ SELECT * FROM
internal.${internal_db_name}.${table_load_name} order by k8; """
@@ -218,7 +186,7 @@ suite("test_export_external_table",
"p0,external,mysql,external_docker,external_
// 2. export external table under internal catalog
uuid = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid}"""
+ outFilePath = "${outfile_path_prefix}" +
"/${table_export_name}_${uuid}"
label = "label_${uuid}"
try {
// check export path
@@ -237,34 +205,27 @@ suite("test_export_external_table",
"p0,external,mysql,external_docker,external_
"""
waiting_export.call(catalog_name, ex_db_name, label)
-
- // check file amounts
- check_file_amounts.call("${outFilePath}", 1)
// check data correctness
create_load_table(table_load_name)
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'column_separator', ','
- set 'strict_mode', 'true'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(100, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into
internal.${internal_db_name}.${table_load_name} from local()")
+ sql """
+ insert into
internal.${internal_db_name}.${table_load_name}
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "format" = "csv",
+ "column_separator" = ","
+ );
+ """
+ def insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
order_qt_select_load2 """ SELECT * FROM
internal.${internal_db_name}.${table_load_name} order by k8; """
@@ -276,7 +237,7 @@ suite("test_export_external_table",
"p0,external,mysql,external_docker,external_
sql """ switch ${catalog_name} """
// 3. csv_with_names
uuid = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid}"""
+ outFilePath = "${outfile_path_prefix}" +
"/${table_export_name}_${uuid}"
label = "label_${uuid}"
try {
// check export path
@@ -293,35 +254,27 @@ suite("test_export_external_table",
"p0,external,mysql,external_docker,external_
);
"""
waiting_export.call(catalog_name, ex_db_name, label)
-
- // check file amounts
- check_file_amounts.call("${outFilePath}", 1)
// check data correctness
create_load_table(table_load_name)
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'column_separator', ','
- set 'strict_mode', 'true'
- set 'format', 'csv_with_names'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(30, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into
internal.${internal_db_name}.${table_load_name} from local()")
+ sql """
+ insert into
internal.${internal_db_name}.${table_load_name}
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "column_separator" = ",",
+ "format" = "csv_with_names"
+ );
+ """
+ def insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
order_qt_select_load3 """ SELECT * FROM
internal.${internal_db_name}.${table_load_name} order by k8; """
@@ -333,7 +286,7 @@ suite("test_export_external_table",
"p0,external,mysql,external_docker,external_
// 4. csv_with_names_and_types
uuid = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid}"""
+ outFilePath = "${outfile_path_prefix}" +
"/${table_export_name}_${uuid}"
label = "label_${uuid}"
try {
// check export path
@@ -350,35 +303,27 @@ suite("test_export_external_table",
"p0,external,mysql,external_docker,external_
);
"""
waiting_export.call(catalog_name, ex_db_name, label)
-
- // check file amounts
- check_file_amounts.call("${outFilePath}", 1)
// check data correctness
create_load_table(table_load_name)
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'column_separator', ','
- set 'strict_mode', 'true'
- set 'format', 'csv_with_names_and_types'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(30, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into
internal.${internal_db_name}.${table_load_name} from local()")
+ sql """
+ insert into
internal.${internal_db_name}.${table_load_name}
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "column_separator" = ",",
+ "format" = "csv_with_names_and_types"
+ );
+ """
+ def insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
order_qt_select_load4 """ SELECT * FROM
internal.${internal_db_name}.${table_load_name} order by k8; """
@@ -390,7 +335,7 @@ suite("test_export_external_table",
"p0,external,mysql,external_docker,external_
// 5. orc
uuid = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid}"""
+ outFilePath = "${outfile_path_prefix}" +
"/${table_export_name}_${uuid}"
label = "label_${uuid}"
try {
// check export path
@@ -406,34 +351,26 @@ suite("test_export_external_table",
"p0,external,mysql,external_docker,external_
);
"""
waiting_export.call(catalog_name, ex_db_name, label)
-
- // check file amounts
- check_file_amounts.call("${outFilePath}", 1)
// check data correctness
create_load_table(table_load_name)
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'strict_mode', 'true'
- set 'format', 'orc'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(30, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into
internal.${internal_db_name}.${table_load_name} from local()")
+ sql """
+ insert into
internal.${internal_db_name}.${table_load_name}
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "format" = "orc"
+ );
+ """
+ def insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
order_qt_select_load5 """ SELECT * FROM
internal.${internal_db_name}.${table_load_name} order by k8; """
@@ -442,10 +379,9 @@ suite("test_export_external_table",
"p0,external,mysql,external_docker,external_
delete_files.call("${outFilePath}")
}
-
- // 5. parquet
+ // 6. parquet
uuid = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid}"""
+ outFilePath = "${outfile_path_prefix}" +
"/${table_export_name}_${uuid}"
label = "label_${uuid}"
try {
// check export path
@@ -461,34 +397,26 @@ suite("test_export_external_table",
"p0,external,mysql,external_docker,external_
);
"""
waiting_export.call(catalog_name, ex_db_name, label)
-
- // check file amounts
- check_file_amounts.call("${outFilePath}", 1)
// check data correctness
create_load_table(table_load_name)
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'strict_mode', 'true'
- set 'format', 'parquet'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(30, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into
internal.${internal_db_name}.${table_load_name} from local()")
+ sql """
+ insert into internal.${internal_db_name}.${table_load_name}
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "format" = "parquet"
+ );
+ """
+ def insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
order_qt_select_load6 """ SELECT * FROM
internal.${internal_db_name}.${table_load_name} order by k8; """
@@ -500,7 +428,7 @@ suite("test_export_external_table",
"p0,external,mysql,external_docker,external_
// 7. test columns property
uuid = UUID.randomUUID().toString()
- outFilePath = """${outfile_path_prefix}_${uuid}"""
+ outFilePath = "${outfile_path_prefix}" +
"/${table_export_name}_${uuid}"
label = "label_${uuid}"
try {
// check export path
@@ -518,36 +446,27 @@ suite("test_export_external_table",
"p0,external,mysql,external_docker,external_
);
"""
waiting_export.call(catalog_name, ex_db_name, label)
-
- // check file amounts
- check_file_amounts.call("${outFilePath}", 1)
// check data correctness
create_load_table(table_load_name)
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'column_separator', ','
- set 'strict_mode', 'true'
- set 'format', 'csv_with_names'
- set 'columns', 'k8, k1, k5, k3, k7'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(30, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into
internal.${internal_db_name}.${table_load_name} from local()")
+ sql """
+ insert into
internal.${internal_db_name}.${table_load_name} (k8, k1, k5, k3, k7)
+ select * from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "format" = "csv_with_names",
+ "column_separator" = ","
+ );
+ """
+ def insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
}
order_qt_select_load7 """ SELECT * FROM
internal.${internal_db_name}.${table_load_name} order by k8; """
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]