This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0c977478237 [fix](regression test) fix test_abort_txn_by_fe (#41382)
0c977478237 is described below
commit 0c97747823749c6a12ddf81c639ba9efa718c789
Author: yujun <[email protected]>
AuthorDate: Wed Oct 2 18:46:07 2024 +0800
[fix](regression test) fix test_abort_txn_by_fe (#41382)
1. fix regression test test_abort_txn_by_fe;
2. delete duplicated suites which just diff ClusterOptions.cloudMode =
true/false, no needn't. ClusterOptions.cloudMode = null always support
both mode;
3. improvement: when run a docker suite with ClusterOptions.cloudMode ==
null, if there's no an external doris cluster, it will run both in cloud
and not_cloud mode;
---
be/src/cloud/cloud_stream_load_executor.cpp | 2 +
.../org/apache/doris/regression/Config.groovy | 7 +
.../org/apache/doris/regression/suite/Suite.groovy | 28 ++--
.../doris/regression/suite/SuiteCluster.groovy | 7 +
...e_cloud1.groovy => test_abort_txn_by_be.groovy} | 35 +++--
.../test_abort_txn_by_be_cloud2.groovy | 164 ---------------------
.../test_abort_txn_by_be_local5.groovy | 164 ---------------------
.../test_abort_txn_by_be_local6.groovy | 164 ---------------------
...e_cloud4.groovy => test_abort_txn_by_fe.groovy} | 42 ++++--
.../test_abort_txn_by_fe_local3.groovy | 104 -------------
10 files changed, 85 insertions(+), 632 deletions(-)
diff --git a/be/src/cloud/cloud_stream_load_executor.cpp
b/be/src/cloud/cloud_stream_load_executor.cpp
index 1352b4aac81..46ceca851e2 100644
--- a/be/src/cloud/cloud_stream_load_executor.cpp
+++ b/be/src/cloud/cloud_stream_load_executor.cpp
@@ -23,6 +23,7 @@
#include "common/logging.h"
#include "common/status.h"
#include "runtime/stream_load/stream_load_context.h"
+#include "util/debug_points.h"
namespace doris {
@@ -96,6 +97,7 @@ Status
CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
}
Status CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
+ DBUG_EXECUTE_IF("StreamLoadExecutor.commit_txn.block", DBUG_BLOCK);
// forward to fe to excute commit transaction for MoW table
if (ctx->is_mow_table() || !config::enable_stream_load_commit_txn_on_be ||
ctx->load_type == TLoadType::ROUTINE_LOAD) {
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy
index 5e79ccef21d..4487d76beaa 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy
@@ -518,6 +518,13 @@ class Config {
// mainly auth_xxx cases use defaultDb, these suites better not use
defaultDb
config.createDefaultDb()
+ try {
+ config.fetchCloudMode()
+ } catch (Exception e) {
+ // docker suite no need external cluster.
+ // so can ignore error here.
+ }
+
return config
}
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 869d4afa486..099799ed8d6 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -290,17 +290,13 @@ class Suite implements GroovyInterceptable {
+ "see example demo_p0/docker_action.groovy")
}
- try {
- context.config.fetchCloudMode()
- } catch (Exception e) {
- }
-
- boolean dockerIsCloud = false
if (options.cloudMode == null) {
if (context.config.runMode == RunMode.UNKNOWN) {
- throw new Exception("Bad run mode, cloud or not_cloud is
unknown")
+ dockerImpl(options, false, actionSupplier)
+ dockerImpl(options, true, actionSupplier)
+ } else {
+ dockerImpl(options, context.config.runMode == RunMode.CLOUD,
actionSupplier)
}
- dockerIsCloud = context.config.runMode == RunMode.CLOUD
} else {
if (options.cloudMode == true && context.config.runMode ==
RunMode.NOT_CLOUD) {
return
@@ -308,12 +304,16 @@ class Suite implements GroovyInterceptable {
if (options.cloudMode == false && context.config.runMode ==
RunMode.CLOUD) {
return
}
- dockerIsCloud = options.cloudMode
+ dockerImpl(options, options.cloudMode, actionSupplier)
}
+ }
+
+ private void dockerImpl(ClusterOptions options, boolean isCloud, Closure
actionSupplier) throws Exception {
+ logger.info("=== start run suite {} in {} mode. ===", name, (isCloud ?
"cloud" : "not_cloud"))
try {
cluster.destroy(true)
- cluster.init(options, dockerIsCloud)
+ cluster.init(options, isCloud)
def user = context.config.jdbcUser
def password = context.config.jdbcPassword
@@ -329,7 +329,7 @@ class Suite implements GroovyInterceptable {
logger.info("get fe {}", fe)
assertNotNull(fe)
- if (!dockerIsCloud) {
+ if (!isCloud) {
for (def be : cluster.getAllBackends()) {
be_report_disk(be.host, be.httpPort)
}
@@ -1478,7 +1478,11 @@ class Suite implements GroovyInterceptable {
}
boolean isCloudMode() {
- return context.config.isCloudMode()
+ if (cluster.isRunning()) {
+ return cluster.isCloudMode()
+ } else {
+ return context.config.isCloudMode()
+ }
}
boolean enableStoragevault() {
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
index c272be39051..2aaece2c678 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
@@ -270,6 +270,7 @@ class SuiteCluster {
final Config config
private boolean running
private boolean sqlModeNodeMgr = false
+ private boolean isCloudMode = false
SuiteCluster(String name, Config config) {
this.name = name
@@ -282,6 +283,8 @@ class SuiteCluster {
assert options.feNum > 0 || options.beNum > 0
assert config.image != null && config.image != ''
+ this.isCloudMode = isCloud
+
def cmd = [
'up', name, config.image
]
@@ -515,6 +518,10 @@ class SuiteCluster {
return running
}
+ boolean isCloudMode() {
+ return this.isCloudMode
+ }
+
// if not specific fe indices, then start all frontends
void startFrontends(int... indices) {
runFrontendsCmd('start', indices)
diff --git
a/regression-test/suites/schema_change_p0/test_abort_txn_by_be_cloud1.groovy
b/regression-test/suites/schema_change_p0/test_abort_txn_by_be.groovy
similarity index 88%
rename from
regression-test/suites/schema_change_p0/test_abort_txn_by_be_cloud1.groovy
rename to regression-test/suites/schema_change_p0/test_abort_txn_by_be.groovy
index f2d0b767eb8..b1af2b1fcad 100644
--- a/regression-test/suites/schema_change_p0/test_abort_txn_by_be_cloud1.groovy
+++ b/regression-test/suites/schema_change_p0/test_abort_txn_by_be.groovy
@@ -18,13 +18,15 @@
import org.apache.doris.regression.suite.ClusterOptions
import org.apache.http.NoHttpResponseException
-suite('test_abort_txn_by_be_cloud1', 'docker') {
+suite('test_abort_txn_by_be', 'docker') {
+
+ def run_test = { enable_abort_txn_by_checking_coordinator_be,
enable_abort_txn_by_checking_conflict_txn ->
def options = new ClusterOptions()
- options.cloudMode = true
+ options.cloudMode = null
options.enableDebugPoints()
options.beConfigs += [ "enable_java_support=false" ]
- options.feConfigs += [ "enable_abort_txn_by_checking_coordinator_be=true" ]
- options.feConfigs += [ "enable_abort_txn_by_checking_conflict_txn=false" ]
+ options.feConfigs += [
"enable_abort_txn_by_checking_coordinator_be=${enable_abort_txn_by_checking_coordinator_be}"
]
+ options.feConfigs += [
"enable_abort_txn_by_checking_conflict_txn=${enable_abort_txn_by_checking_conflict_txn}"
]
options.beNum = 1
docker(options) {
@@ -59,6 +61,8 @@ suite('test_abort_txn_by_be_cloud1', 'docker') {
lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount,
lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy"""
+
GetDebugPoint().enableDebugPointForAllBEs('StreamLoadExecutor.commit_txn.block')
+
thread {
streamLoad {
// a default db 'regression_test' is specified in
@@ -86,22 +90,21 @@ suite('test_abort_txn_by_be_cloud1', 'docker') {
// if declared a check callback, the default check condition
will ignore.
// So you must check all condition
check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
- assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}
}
- sleep(10000)
+ if (isCloudMode()) {
+ sleep 3000
+ } else {
+ def dbId = getDbId()
+ dockerAwaitUntil(20, {
+ def txns = sql_return_maparray("show proc
'/transactions/${dbId}/running'")
+ txns.size() > 0
+ })
+ }
sql """ alter table ${tableName} modify column lo_suppkey bigint NULL
"""
-
String result = ""
int max_try_time = 3000
while (max_try_time--){
@@ -161,4 +164,8 @@ suite('test_abort_txn_by_be_cloud1', 'docker') {
}
}
}
+ }
+
+ run_test(true, false)
+ run_test(false, true)
}
diff --git
a/regression-test/suites/schema_change_p0/test_abort_txn_by_be_cloud2.groovy
b/regression-test/suites/schema_change_p0/test_abort_txn_by_be_cloud2.groovy
deleted file mode 100644
index 7264ac7f90a..00000000000
--- a/regression-test/suites/schema_change_p0/test_abort_txn_by_be_cloud2.groovy
+++ /dev/null
@@ -1,164 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-import org.apache.doris.regression.suite.ClusterOptions
-import org.apache.http.NoHttpResponseException
-
-suite('test_abort_txn_by_be_cloud2', 'docker') {
- def options = new ClusterOptions()
- options.cloudMode = true
- options.enableDebugPoints()
- options.beConfigs += [ "enable_java_support=false" ]
- options.feConfigs += [ "enable_abort_txn_by_checking_coordinator_be=false"
]
- options.feConfigs += [ "enable_abort_txn_by_checking_conflict_txn=true" ]
- options.beNum = 1
-
- docker(options) {
- def getJobState = { tableName ->
- def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE
IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """
- return jobStateResult[0][9]
- }
-
- def s3BucketName = getS3BucketName()
- def s3WithProperties = """WITH S3 (
- |"AWS_ACCESS_KEY" = "${getS3AK()}",
- |"AWS_SECRET_KEY" = "${getS3SK()}",
- |"AWS_ENDPOINT" = "${getS3Endpoint()}",
- |"AWS_REGION" = "${getS3Region()}",
- |"provider" = "${getS3Provider()}")
- |PROPERTIES(
- |"exec_mem_limit" = "8589934592",
- |"load_parallelism" = "3")""".stripMargin()
-
- // set fe configuration
- sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' =
'161061273600')"
-
- def tableName = "lineorder"
- // create table if not exists
- sql new
File("""${context.file.parent}/ddl/lineorder_delete.sql""").text
- sql new
File("""${context.file.parent}/ddl/lineorder_create.sql""").text
-
- def coordinatorBe = cluster.getAllBackends().get(0)
- def coordinatorBeHost = coordinatorBe.host
-
- def column =
"""lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority,
-
lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount,
-
lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy"""
-
- thread {
- streamLoad {
- // a default db 'regression_test' is specified in
- // ${DORIS_HOME}/conf/regression-conf.groovy
- table tableName
-
- // default label is UUID:
- // set 'label' UUID.randomUUID().toString()
-
- // default column_separator is specify in doris fe config,
usually is '\t'.
- // this line change to ','
- set 'column_separator', '|'
- set 'compress_type', 'GZ'
- set 'columns', column
-
-
- // relate to
${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
- // also, you can stream load a http stream, e.g.
http://xxx/some.csv
- file
"""${getS3Url()}/regression/ssb/sf100/lineorder.tbl.1.gz"""
-
- time 600 * 1000
-
- // stream load action will check result, include Success
status, and NumberTotalRows == NumberLoadedRows
-
- // if declared a check callback, the default check condition
will ignore.
- // So you must check all condition
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
- assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
- }
- }
- }
-
- sleep(10000)
-
- sql """ alter table ${tableName} modify column lo_suppkey bigint NULL
"""
-
- String result = ""
- int max_try_time = 3000
- while (max_try_time--){
- result = getJobState(tableName)
- if (result == "PENDING") {
- sleep(3000)
- } else {
- break;
- }
- }
- if (max_try_time < 1){
- assertEquals(1,2)
- }
- assertEquals(result, "WAITING_TXN");
-
- cluster.stopBackends(coordinatorBe.index)
- def isDead = false
- for (def i = 0; i < 10; i++) {
- def be = sql_return_maparray('show backends').find { it.Host ==
coordinatorBeHost }
- if (!be.Alive.toBoolean()) {
- isDead = true
- break
- }
- sleep 1000
- }
- assertTrue(isDead)
- sleep 10000
-
- result = getJobState(tableName)
- assertEquals(result, "WAITING_TXN");
-
- // coordinatorBe restart, abort txn on it
- cluster.startBackends(coordinatorBe.index)
- def isAlive = false
- for (def i = 0; i < 20; i++) {
- def be = sql_return_maparray('show backends').find { it.Host ==
coordinatorBeHost }
- if (be.Alive.toBoolean()) {
- isAlive = true
- break
- }
- sleep 1000
- }
- assertTrue(isAlive)
- sleep 5000
-
- max_try_time = 3000
- while (max_try_time--){
- result = getJobState(tableName)
- if (result == "FINISHED") {
- sleep(3000)
- break
- } else {
- sleep(100)
- if (max_try_time < 1){
- assertEquals(1,2)
- }
- }
- }
- }
-}
diff --git
a/regression-test/suites/schema_change_p0/test_abort_txn_by_be_local5.groovy
b/regression-test/suites/schema_change_p0/test_abort_txn_by_be_local5.groovy
deleted file mode 100644
index 3835da4ccb2..00000000000
--- a/regression-test/suites/schema_change_p0/test_abort_txn_by_be_local5.groovy
+++ /dev/null
@@ -1,164 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-import org.apache.doris.regression.suite.ClusterOptions
-import org.apache.http.NoHttpResponseException
-
-suite('test_abort_txn_by_be_local5', 'docker') {
- def options = new ClusterOptions()
- options.cloudMode = false
- options.enableDebugPoints()
- options.beConfigs += [ "enable_java_support=false" ]
- options.feConfigs += [ "enable_abort_txn_by_checking_coordinator_be=true" ]
- options.feConfigs += [ "enable_abort_txn_by_checking_conflict_txn=false" ]
- options.beNum = 1
-
- docker(options) {
- def getJobState = { tableName ->
- def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE
IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """
- return jobStateResult[0][9]
- }
-
- def s3BucketName = getS3BucketName()
- def s3WithProperties = """WITH S3 (
- |"AWS_ACCESS_KEY" = "${getS3AK()}",
- |"AWS_SECRET_KEY" = "${getS3SK()}",
- |"AWS_ENDPOINT" = "${getS3Endpoint()}",
- |"AWS_REGION" = "${getS3Region()}",
- |"provider" = "${getS3Provider()}")
- |PROPERTIES(
- |"exec_mem_limit" = "8589934592",
- |"load_parallelism" = "3")""".stripMargin()
-
- // set fe configuration
- sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' =
'161061273600')"
-
- def tableName = "lineorder"
- // create table if not exists
- sql new
File("""${context.file.parent}/ddl/lineorder_delete.sql""").text
- sql new
File("""${context.file.parent}/ddl/lineorder_create.sql""").text
-
- def coordinatorBe = cluster.getAllBackends().get(0)
- def coordinatorBeHost = coordinatorBe.host
-
- def column =
"""lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority,
-
lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount,
-
lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy"""
-
- thread {
- streamLoad {
- // a default db 'regression_test' is specified in
- // ${DORIS_HOME}/conf/regression-conf.groovy
- table tableName
-
- // default label is UUID:
- // set 'label' UUID.randomUUID().toString()
-
- // default column_separator is specify in doris fe config,
usually is '\t'.
- // this line change to ','
- set 'column_separator', '|'
- set 'compress_type', 'GZ'
- set 'columns', column
-
-
- // relate to
${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
- // also, you can stream load a http stream, e.g.
http://xxx/some.csv
- file
"""${getS3Url()}/regression/ssb/sf100/lineorder.tbl.1.gz"""
-
- time 600 * 1000
-
- // stream load action will check result, include Success
status, and NumberTotalRows == NumberLoadedRows
-
- // if declared a check callback, the default check condition
will ignore.
- // So you must check all condition
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
- assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
- }
- }
- }
-
- sleep(10000)
-
- sql """ alter table ${tableName} modify column lo_suppkey bigint NULL
"""
-
- String result = ""
- int max_try_time = 3000
- while (max_try_time--){
- result = getJobState(tableName)
- if (result == "PENDING") {
- sleep(3000)
- } else {
- break;
- }
- }
- if (max_try_time < 1){
- assertEquals(1,2)
- }
- assertEquals(result, "WAITING_TXN");
-
- cluster.stopBackends(coordinatorBe.index)
- def isDead = false
- for (def i = 0; i < 10; i++) {
- def be = sql_return_maparray('show backends').find { it.Host ==
coordinatorBeHost }
- if (!be.Alive.toBoolean()) {
- isDead = true
- break
- }
- sleep 1000
- }
- assertTrue(isDead)
- sleep 10000
-
- result = getJobState(tableName)
- assertEquals(result, "WAITING_TXN");
-
- // coordinatorBe restart, abort txn on it
- cluster.startBackends(coordinatorBe.index)
- def isAlive = false
- for (def i = 0; i < 20; i++) {
- def be = sql_return_maparray('show backends').find { it.Host ==
coordinatorBeHost }
- if (be.Alive.toBoolean()) {
- isAlive = true
- break
- }
- sleep 1000
- }
- assertTrue(isAlive)
- sleep 20000
-
- max_try_time = 3000
- while (max_try_time--){
- result = getJobState(tableName)
- if (result == "FINISHED") {
- sleep(3000)
- break
- } else {
- sleep(100)
- if (max_try_time < 1){
- assertEquals(1,2)
- }
- }
- }
- }
-}
diff --git
a/regression-test/suites/schema_change_p0/test_abort_txn_by_be_local6.groovy
b/regression-test/suites/schema_change_p0/test_abort_txn_by_be_local6.groovy
deleted file mode 100644
index ff53c412590..00000000000
--- a/regression-test/suites/schema_change_p0/test_abort_txn_by_be_local6.groovy
+++ /dev/null
@@ -1,164 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-import org.apache.doris.regression.suite.ClusterOptions
-import org.apache.http.NoHttpResponseException
-
-suite('test_abort_txn_by_be_local6', 'docker') {
- def options = new ClusterOptions()
- options.cloudMode = false
- options.enableDebugPoints()
- options.beConfigs += [ "enable_java_support=false" ]
- options.feConfigs += [ "enable_abort_txn_by_checking_coordinator_be=false"
]
- options.feConfigs += [ "enable_abort_txn_by_checking_conflict_txn=true" ]
- options.beNum = 1
-
- docker(options) {
- def getJobState = { tableName ->
- def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE
IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """
- return jobStateResult[0][9]
- }
-
- def s3BucketName = getS3BucketName()
- def s3WithProperties = """WITH S3 (
- |"AWS_ACCESS_KEY" = "${getS3AK()}",
- |"AWS_SECRET_KEY" = "${getS3SK()}",
- |"AWS_ENDPOINT" = "${getS3Endpoint()}",
- |"AWS_REGION" = "${getS3Region()}",
- |"provider" = "${getS3Provider()}")
- |PROPERTIES(
- |"exec_mem_limit" = "8589934592",
- |"load_parallelism" = "3")""".stripMargin()
-
- // set fe configuration
- sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' =
'161061273600')"
-
- def tableName = "lineorder"
- // create table if not exists
- sql new
File("""${context.file.parent}/ddl/lineorder_delete.sql""").text
- sql new
File("""${context.file.parent}/ddl/lineorder_create.sql""").text
-
- def coordinatorBe = cluster.getAllBackends().get(0)
- def coordinatorBeHost = coordinatorBe.host
-
- def column =
"""lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority,
-
lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount,
-
lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy"""
-
- thread {
- streamLoad {
- // a default db 'regression_test' is specified in
- // ${DORIS_HOME}/conf/regression-conf.groovy
- table tableName
-
- // default label is UUID:
- // set 'label' UUID.randomUUID().toString()
-
- // default column_separator is specify in doris fe config,
usually is '\t'.
- // this line change to ','
- set 'column_separator', '|'
- set 'compress_type', 'GZ'
- set 'columns', column
-
-
- // relate to
${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
- // also, you can stream load a http stream, e.g.
http://xxx/some.csv
- file
"""${getS3Url()}/regression/ssb/sf100/lineorder.tbl.1.gz"""
-
- time 600 * 1000
-
- // stream load action will check result, include Success
status, and NumberTotalRows == NumberLoadedRows
-
- // if declared a check callback, the default check condition
will ignore.
- // So you must check all condition
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
- assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
- }
- }
- }
-
- sleep(10000)
-
- sql """ alter table ${tableName} modify column lo_suppkey bigint NULL
"""
-
- String result = ""
- int max_try_time = 3000
- while (max_try_time--){
- result = getJobState(tableName)
- if (result == "PENDING") {
- sleep(3000)
- } else {
- break;
- }
- }
- if (max_try_time < 1){
- assertEquals(1,2)
- }
- assertEquals(result, "WAITING_TXN");
-
- cluster.stopBackends(coordinatorBe.index)
- def isDead = false
- for (def i = 0; i < 10; i++) {
- def be = sql_return_maparray('show backends').find { it.Host ==
coordinatorBeHost }
- if (!be.Alive.toBoolean()) {
- isDead = true
- break
- }
- sleep 1000
- }
- assertTrue(isDead)
- sleep 10000
-
- result = getJobState(tableName)
- assertEquals(result, "WAITING_TXN");
-
- // coordinatorBe restart, abort txn on it
- cluster.startBackends(coordinatorBe.index)
- def isAlive = false
- for (def i = 0; i < 20; i++) {
- def be = sql_return_maparray('show backends').find { it.Host ==
coordinatorBeHost }
- if (be.Alive.toBoolean()) {
- isAlive = true
- break
- }
- sleep 1000
- }
- assertTrue(isAlive)
- sleep 5000
-
- max_try_time = 3000
- while (max_try_time--){
- result = getJobState(tableName)
- if (result == "FINISHED") {
- sleep(3000)
- break
- } else {
- sleep(100)
- if (max_try_time < 1){
- assertEquals(1,2)
- }
- }
- }
- }
-}
diff --git
a/regression-test/suites/schema_change_p0/test_abort_txn_by_fe_cloud4.groovy
b/regression-test/suites/schema_change_p0/test_abort_txn_by_fe.groovy
similarity index 75%
rename from
regression-test/suites/schema_change_p0/test_abort_txn_by_fe_cloud4.groovy
rename to regression-test/suites/schema_change_p0/test_abort_txn_by_fe.groovy
index 80b61e16efd..d93e8a203e3 100644
--- a/regression-test/suites/schema_change_p0/test_abort_txn_by_fe_cloud4.groovy
+++ b/regression-test/suites/schema_change_p0/test_abort_txn_by_fe.groovy
@@ -18,13 +18,17 @@
import org.apache.doris.regression.suite.ClusterOptions
import org.apache.http.NoHttpResponseException
-suite('test_abort_txn_by_fe_cloud4', 'docker') {
+suite('test_abort_txn_by_fe', 'docker') {
def options = new ClusterOptions()
- options.cloudMode = true
+ options.cloudMode = null
options.enableDebugPoints()
options.beConfigs += [ "enable_java_support=false" ]
- options.feConfigs += [ "enable_abort_txn_by_checking_coordinator_be=false"
]
- options.feConfigs += [ "enable_abort_txn_by_checking_conflict_txn=true" ]
+ options.feConfigs += [
+ "load_checker_interval_second=2",
+ "enable_abort_txn_by_checking_coordinator_be=false",
+ "enable_abort_txn_by_checking_conflict_txn=true",
+ ]
+ options.feNum = 3
options.beNum = 1
docker(options) {
@@ -59,10 +63,15 @@ suite('test_abort_txn_by_fe_cloud4', 'docker') {
loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) +
s3WithProperties
sql loadSql
- def coordinatorFe = cluster.getAllFrontends().get(0)
- def coordinatorFeHost = coordinatorFe.host
-
- sleep(5000)
+ if (isCloudMode()) {
+ sleep 6000
+ } else {
+ def dbId = getDbId()
+ dockerAwaitUntil(20, {
+ def txns = sql_return_maparray("show proc
'/transactions/${dbId}/running'")
+ txns.any { it.Label == loadLabel }
+ })
+ }
sql """ alter table ${table} modify column lo_suppkey bigint NULL """
@@ -82,9 +91,22 @@ suite('test_abort_txn_by_fe_cloud4', 'docker') {
sleep 10000
assertEquals(result, "WAITING_TXN");
- cluster.restartFrontends()
- sleep(30000)
+ def oldMasterFe = cluster.getMasterFe()
+ cluster.restartFrontends(oldMasterFe.index)
+ boolean hasRestart = false
+ for (int i = 0; i < 30; i++) {
+ if (cluster.getFeByIndex(oldMasterFe.index).alive) {
+ hasRestart = true
+ break
+ }
+ sleep 1000
+ }
+ assertTrue(hasRestart)
context.reconnectFe()
+ if (!isCloudMode()) {
+ def newMasterFe = cluster.getMasterFe()
+ assertTrue(oldMasterFe.index != newMasterFe.index)
+ }
max_try_time = 3000
while (max_try_time--){
diff --git
a/regression-test/suites/schema_change_p0/test_abort_txn_by_fe_local3.groovy
b/regression-test/suites/schema_change_p0/test_abort_txn_by_fe_local3.groovy
deleted file mode 100644
index 32cd9d0eba7..00000000000
--- a/regression-test/suites/schema_change_p0/test_abort_txn_by_fe_local3.groovy
+++ /dev/null
@@ -1,104 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-import org.apache.doris.regression.suite.ClusterOptions
-import org.apache.http.NoHttpResponseException
-
-suite('test_abort_txn_by_fe_local3', 'docker') {
- def options = new ClusterOptions()
- options.cloudMode = false
- options.enableDebugPoints()
- options.beConfigs += [ "enable_java_support=false" ]
- options.feConfigs += [ "enable_abort_txn_by_checking_coordinator_be=false"
]
- options.feConfigs += [ "enable_abort_txn_by_checking_conflict_txn=true" ]
- options.beNum = 1
-
- docker(options) {
- def getJobState = { tableName ->
- def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE
IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """
- return jobStateResult[0][9]
- }
-
- def s3BucketName = getS3BucketName()
- def s3WithProperties = """WITH S3 (
- |"AWS_ACCESS_KEY" = "${getS3AK()}",
- |"AWS_SECRET_KEY" = "${getS3SK()}",
- |"AWS_ENDPOINT" = "${getS3Endpoint()}",
- |"AWS_REGION" = "${getS3Region()}",
- |"provider" = "${getS3Provider()}")
- |PROPERTIES(
- |"exec_mem_limit" = "8589934592",
- |"load_parallelism" = "3")""".stripMargin()
-
- // set fe configuration
- sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' =
'161061273600')"
-
- def table= "lineorder"
- // create table if not exists
- sql new
File("""${context.file.parent}/ddl/lineorder_delete.sql""").text
- sql new
File("""${context.file.parent}/ddl/lineorder_create.sql""").text
- def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString()
- def loadLabel = table + '_' + uniqueID
-
- // load data from cos
- def loadSql = new
File("""${context.file.parent}/ddl/${table}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}",
s3BucketName)
- loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) +
s3WithProperties
- sql loadSql
-
- def coordinatorFe = cluster.getAllFrontends().get(0)
- def coordinatorFeHost = coordinatorFe.host
-
- sleep(5000)
-
- sql """ alter table ${table} modify column lo_suppkey bigint NULL """
-
- String result = ""
- int max_try_time = 3000
- while (max_try_time--){
- result = getJobState(table)
- if (result == "PENDING") {
- sleep(3000)
- } else {
- break;
- }
- }
- if (max_try_time < 1){
- assertEquals(1,2)
- }
- sleep 10000
- assertEquals(result, "WAITING_TXN");
-
- cluster.restartFrontends()
- sleep(30000)
- context.reconnectFe()
-
- max_try_time = 3000
- while (max_try_time--){
- result = getJobState(table)
- System.out.println(result)
- if (result == "FINISHED") {
- sleep(3000)
- break
- } else {
- sleep(100)
- if (max_try_time < 1){
- assertEquals(1,2)
- }
- }
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]