This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5706b6bfe7c [test](show_data) test the correctness of data statistics
in cloud mode (#44947)
5706b6bfe7c is described below
commit 5706b6bfe7c2f6115294167fb4cac0185799d061
Author: chunping <[email protected]>
AuthorDate: Thu Dec 12 10:39:26 2024 +0800
[test](show_data) test the correctness of data statistics in cloud mode
(#44947)
---
aazcp.tar.gz | Bin 0 -> 4218 bytes
regression-test/conf/regression-conf.groovy | 7 +
regression-test/framework/pom.xml | 5 +
regression-test/plugins/aliyunOssSdk.groovy | 169 +++++++++++++++
.../suites/show_data/ddl/lineitem_delete.sql | 2 +
.../suites/show_data/ddl/lineitem_dup.sql | 25 +++
.../suites/show_data/ddl/lineitem_mow.sql | 25 +++
.../suites/show_data/test_show_mow_data.groovy | 240 +++++++++++++++++++++
8 files changed, 473 insertions(+)
diff --git a/aazcp.tar.gz b/aazcp.tar.gz
new file mode 100644
index 00000000000..681acf72cde
Binary files /dev/null and b/aazcp.tar.gz differ
diff --git a/regression-test/conf/regression-conf.groovy
b/regression-test/conf/regression-conf.groovy
index ab9bb0beb91..bc001126bce 100644
--- a/regression-test/conf/regression-conf.groovy
+++ b/regression-test/conf/regression-conf.groovy
@@ -259,3 +259,10 @@ lakesoulMinioEndpoint="*******"
metaServiceToken = "greedisgood9999"
instanceId = "default_instance_id"
multiClusterInstance = "default_instance_id"
+
+storageProvider = "oss"
+cbsS3Ak = "*******"
+cbsS3Sk = "*******"
+cbsS3Endpoint = "oss-cn-beijing.aliyuncs.com"
+cbsS3Bucket = "test-bucket"
+cbsS3Prefix = "test-cluster-prefix"
diff --git a/regression-test/framework/pom.xml
b/regression-test/framework/pom.xml
index 813659989ae..cad25df8ced 100644
--- a/regression-test/framework/pom.xml
+++ b/regression-test/framework/pom.xml
@@ -409,6 +409,11 @@ under the License.
<!--Regression tests need to include this jar-->
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>com.aliyun.oss</groupId>
+ <artifactId>aliyun-sdk-oss</artifactId>
+ <version>3.18.1</version>
+ </dependency>
</dependencies>
</project>
diff --git a/regression-test/plugins/aliyunOssSdk.groovy
b/regression-test/plugins/aliyunOssSdk.groovy
new file mode 100644
index 00000000000..cbc132a088d
--- /dev/null
+++ b/regression-test/plugins/aliyunOssSdk.groovy
@@ -0,0 +1,169 @@
+
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+import org.apache.doris.regression.suite.Suite;
+import org.apache.doris.regression.Config;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.aliyun.oss.ClientException;
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.OSSClientBuilder;
+import com.aliyun.oss.OSSException;
+import com.aliyun.oss.model.DeleteObjectsRequest;
+import com.aliyun.oss.model.DeleteObjectsResult;
+import com.aliyun.oss.model.ListObjectsRequest;
+import com.aliyun.oss.model.OSSObjectSummary;
+import com.aliyun.oss.model.ObjectListing;
+
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+import groovy.util.logging.Slf4j
+
+Suite.metaClass.initOssClient = { String accessKeyId, String accessKeySecret,
String endpoint ->
+ return new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret)
+}
+
+Suite.metaClass.listOssObjectWithPrefix = { OSS client, String bucketName,
String prefix="" ->
+ try {
+ ObjectListing objectListing = null;
+ String nextMarker = null;
+ final int maxKeys = 500;
+ List<OSSObjectSummary> sums = null;
+
+ if (!client.doesBucketExist(bucketName)) {
+ logger.info("no bucket named ${bucketName} in ${endpoint}")
+ return
+ }
+
+ // Gets all object with specified marker by paging. Each page will
have up to 100 entries.
+ logger.info("List all objects with prefix:");
+ nextMarker = null;
+ do {
+ objectListing = client.listObjects(new
ListObjectsRequest(bucketName).
+
withPrefix(prefix).withMarker(nextMarker).withMaxKeys(maxKeys));
+
+ sums = objectListing.getObjectSummaries();
+ for (OSSObjectSummary s : sums) {
+ logger.info("\t" + s.getKey());
+ }
+
+ nextMarker = objectListing.getNextMarker();
+
+ } while (objectListing.isTruncated());
+ } catch (OSSException oe) {
+ logger.error("Caught an OSSException, which means your request made it
to OSS, "
+ + "but was rejected with an error response for some reason.");
+ logger.error("Error Message: " + oe.getErrorMessage());
+ logger.error("Error Code: " + oe.getErrorCode());
+ logger.error("Request ID: " + oe.getRequestId());
+ logger.error("Host ID: " + oe.getHostId());
+ } catch (ClientException ce) {
+ logger.error("Caught an ClientException, which means the client
encountered "
+ + "a serious internal problem while trying to communicate with
OSS, "
+ + "such as not being able to access the network.");
+ logger.error("Error Message: " + ce.getMessage());
+ } finally {
+ /*
+ * Do not forget to shut down the client finally to release all
allocated resources.
+ */
+ //client.shutdown();
+ logger.info("Done!")
+ }
+
+}
+
+// get file size in a specific directory
+Suite.metaClass.calculateFolderLength = { OSS client, String bucketName,
String folder ->
+ long size = 0L;
+ ObjectListing objectListing = null;
+ do {
+ // The default value for MaxKey is 100, and the maximum value is 1000
+ ListObjectsRequest request = new
ListObjectsRequest(bucketName).withPrefix(folder).withMaxKeys(1000);
+ if (objectListing != null) {
+ request.setMarker(objectListing.getNextMarker());
+ }
+ objectListing = client.listObjects(request);
+ List<OSSObjectSummary> sums = objectListing.getObjectSummaries();
+ for (OSSObjectSummary s : sums) {
+ size += s.getSize();
+ }
+ } while (objectListing.isTruncated());
+ return size;
+}
+
+Suite.metaClass.shutDownOssClient = { OSS client ->
+ client.shutdown();
+}
+
+
+
+Suite.metaClass.getOssAllDirSizeWithPrefix = { OSS client, String bucketName,
String prefix="" ->
+ try {
+ if (!client.doesBucketExist(bucketName)) {
+ logger.info("no bucket named ${bucketName} in ${endpoint}")
+ return
+ }
+
+ // Gets all object with specified marker by paging. Each page will
have up to 100 entries.
+ logger.info("List all objects with prefix:");
+ ObjectListing objectListing = null;
+ do {
+ // By default, list 100 files or directories at a time
+ ListObjectsRequest request = new
ListObjectsRequest(bucketName).withDelimiter("/").withPrefix(prefix);
+ if (objectListing != null) {
+ request.setMarker(objectListing.getNextMarker());
+ }
+ objectListing = client.listObjects(request);
+ List<String> folders = objectListing.getCommonPrefixes();
+ for (String folder : folders) {
+ logger.info(folder + " : " + (calculateFolderLength(client,
bucketName, folder) / (1024 * 1024 * 1024)) + "GB");
+ }
+ List<OSSObjectSummary> sums = objectListing.getObjectSummaries();
+ for (OSSObjectSummary s : sums) {
+ logger.info(s.getKey() + " : " + (s.getSize() / (1024 * 1024 *
1024)) + "GB");
+ }
+ } while (objectListing.isTruncated());
+
+ } catch (OSSException oe) {
+ logger.error("Caught an OSSException, which means your request made it
to OSS, "
+ + "but was rejected with an error response for some reason.");
+ logger.error("Error Message: " + oe.getErrorMessage());
+ logger.error("Error Code: " + oe.getErrorCode());
+ logger.error("Request ID: " + oe.getRequestId());
+ logger.error("Host ID: " + oe.getHostId());
+ } catch (ClientException ce) {
+ logger.error("Caught an ClientException, which means the client
encountered "
+ + "a serious internal problem while trying to communicate with
OSS, "
+ + "such as not being able to access the network.");
+ logger.error("Error Message: " + ce.getMessage());
+ } finally {
+ /*
+ * Do not forget to shut down the client finally to release all
allocated resources.
+ */
+ //client.shutdown();
+ logger.info("Done!")
+ }
+}
+
+
+
diff --git a/regression-test/suites/show_data/ddl/lineitem_delete.sql
b/regression-test/suites/show_data/ddl/lineitem_delete.sql
new file mode 100644
index 00000000000..df8b0405649
--- /dev/null
+++ b/regression-test/suites/show_data/ddl/lineitem_delete.sql
@@ -0,0 +1,2 @@
+DELETE from ${table} where L_ORDERKEY >= 0;
+
diff --git a/regression-test/suites/show_data/ddl/lineitem_dup.sql
b/regression-test/suites/show_data/ddl/lineitem_dup.sql
new file mode 100644
index 00000000000..29ed215d236
--- /dev/null
+++ b/regression-test/suites/show_data/ddl/lineitem_dup.sql
@@ -0,0 +1,25 @@
+CREATE TABLE IF NOT EXISTS lineitem_mow (
+ L_ORDERKEY INTEGER NOT NULL,
+ L_PARTKEY INTEGER NOT NULL,
+ L_SUPPKEY INTEGER NOT NULL,
+ L_LINENUMBER INTEGER NOT NULL,
+ L_QUANTITY DECIMAL(15,2) NOT NULL,
+ L_EXTENDEDPRICE DECIMAL(15,2) NOT NULL,
+ L_DISCOUNT DECIMAL(15,2) NOT NULL,
+ L_TAX DECIMAL(15,2) NOT NULL,
+ L_RETURNFLAG CHAR(1) NOT NULL,
+ L_LINESTATUS CHAR(1) NOT NULL,
+ L_SHIPDATE DATE NOT NULL,
+ L_COMMITDATE DATE NOT NULL,
+ L_RECEIPTDATE DATE NOT NULL,
+ L_SHIPINSTRUCT CHAR(25) NOT NULL,
+ L_SHIPMODE CHAR(10) NOT NULL,
+ L_COMMENT VARCHAR(44) NOT NULL
+)
+DUPLICATE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER)
+DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 3
+PROPERTIES (
+ "replication_num" = "1"
+)
+
+
diff --git a/regression-test/suites/show_data/ddl/lineitem_mow.sql
b/regression-test/suites/show_data/ddl/lineitem_mow.sql
new file mode 100644
index 00000000000..1d29f44ac8f
--- /dev/null
+++ b/regression-test/suites/show_data/ddl/lineitem_mow.sql
@@ -0,0 +1,25 @@
+CREATE TABLE IF NOT EXISTS lineitem_mow (
+ L_ORDERKEY INTEGER NOT NULL,
+ L_PARTKEY INTEGER NOT NULL,
+ L_SUPPKEY INTEGER NOT NULL,
+ L_LINENUMBER INTEGER NOT NULL,
+ L_QUANTITY DECIMAL(15,2) NOT NULL,
+ L_EXTENDEDPRICE DECIMAL(15,2) NOT NULL,
+ L_DISCOUNT DECIMAL(15,2) NOT NULL,
+ L_TAX DECIMAL(15,2) NOT NULL,
+ L_RETURNFLAG CHAR(1) NOT NULL,
+ L_LINESTATUS CHAR(1) NOT NULL,
+ L_SHIPDATE DATE NOT NULL,
+ L_COMMITDATE DATE NOT NULL,
+ L_RECEIPTDATE DATE NOT NULL,
+ L_SHIPINSTRUCT CHAR(25) NOT NULL,
+ L_SHIPMODE CHAR(10) NOT NULL,
+ L_COMMENT VARCHAR(44) NOT NULL
+)
+UNIQUE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER)
+DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 3
+PROPERTIES (
+ "replication_num" = "1"
+)
+
+
diff --git a/regression-test/suites/show_data/test_show_mow_data.groovy
b/regression-test/suites/show_data/test_show_mow_data.groovy
new file mode 100644
index 00000000000..c94e5786d7e
--- /dev/null
+++ b/regression-test/suites/show_data/test_show_mow_data.groovy
@@ -0,0 +1,240 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// The cases is copied from https://github.com/trinodb/trino/tree/master
+// /testing/trino-product-tests/src/main/resources/sql-tests/testcases/tpcds
+// and modified by Doris.
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+ // loading one data 10 times, expect data size not rising
+suite("test_mow_show_data_in_cloud","p2") {
+ //cloud-mode
+ if (!isCloudMode()) {
+ logger.info("not cloud mode, not run")
+ return
+ }
+
+ def repeate_stream_load_same_data = { String tableName, int loadTimes ->
+ for (int i = 0; i < loadTimes; i++) {
+ streamLoad {
+ table tableName
+ set 'column_separator', '|'
+ set 'compress_type', 'GZ'
+ file
"""${getS3Url()}/regression/tpch/sf1/lineitem.csv.split00.gz"""
+ time 10000 // limit inflight 10s
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+ }
+ }
+ }
+ }
+
+ def get_tablets_from_table = { String table ->
+ def res = sql_return_maparray """show tablets from ${table}"""
+ return res
+ }
+
+ def show_tablet_compaction = { HashMap tablet ->
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET ")
+ sb.append(tablet["CompactionStatus"])
+ String command = sb.toString()
+ logger.info(command)
+ process = command.execute()
+ code = process.waitFor()
+ err = IOGroovyMethods.getText(new BufferedReader(new
InputStreamReader(process.getErrorStream())));
+ out = process.getText()
+ logger.info("Get compaction status: code=" + code + ", out=" + out +
", err=" + err)
+ assertEquals(code, 0)
+ return parseJson(out.trim())
+ }
+
+ def trigger_tablet_compaction = { HashMap tablet, String compact_type ->
+ //support trigger base/cumulative/full compaction
+ def tabletStatusBeforeCompaction = show_tablet_compaction(tablet)
+
+ String tabletInBe = tablet
+ String showCompactionStatus = tablet["CompactionStatus"]
+ String triggerCompactionUrl = showCompactionStatus.split("show")[0] +
"run?tablet_id=" + tablet["TabletId"] + "&compact_type=" + compact_type
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X POST ")
+ sb.append(triggerCompactionUrl)
+ String command = sb.toString()
+ logger.info(command)
+ process = command.execute()
+ code = process.waitFor()
+ err = IOGroovyMethods.getText(new BufferedReader(new
InputStreamReader(process.getErrorStream())));
+ out = process.getText()
+ def outJson = parseJson(out)
+ logger.info("Get compaction status: code=" + code + ", out=" + out +
", err=" + err)
+ // if code = 0 means compaction happend, need to check
+ // other condition may indicate no suitable compaction condition
+ if ( code == 0 && outJson.status.toLowerCase() == "success" ){
+ def compactionStatus = "RUNNING"
+ def tabletStatusAfterCompaction = null
+ long startTime = System.currentTimeMillis()
+ long timeoutTimestamp = startTime + 5 * 60 * 1000 // 5 min
+ do {
+ tabletStatusAfterCompaction = show_tablet_compaction(tablet)
+ logger.info("tabletStatusAfterCompaction class: " +
tabletStatusAfterCompaction.class)
+ logger.info("hhhhhh: " +
tabletStatusAfterCompaction.toString())
+ if (tabletStatusAfterCompaction.rowsets.size() <
tabletStatusBeforeCompaction.rowsets.size()){
+ compactionStatus = 'FINISHED'
+ }
+ Thread.sleep(60 * 1000)
+ } while (timeoutTimestamp > System.currentTimeMillis() && (status
!= 'FINISHED'))
+
+ if (status != "FINISHED") {
+ logger.info("compaction not Finish or failed")
+ return false
+ }
+ }
+ }
+
+ def trigger_compaction = { List<List<Object>> tablets ->
+ for(def tablet: tablets) {
+ trigger_tablet_compaction(tablet, "cumulative")
+ trigger_tablet_compaction(tablet, "base")
+ trigger_tablet_compaction(tablet, "full")
+ }
+ }
+
+ def caculate_table_data_size_in_backend_storage = { List<List<Object>>
tablets ->
+ storageType = context.config.otherConfigs.get("storageProvider")
+ Double storageSize = 0
+
+ List<String> tabletIds = []
+ for(def tablet: tablets) {
+ tabletIds.add(tablet["TabletId"])
+ }
+
+ if (storageType.toLowerCase() == "oss") {
+ //cbs means cluster backend storage
+ ak = context.config.otherConfigs.get("cbsS3Ak")
+ sk = context.config.otherConfigs.get("cbsS3Sk")
+ endpoint = context.config.otherConfigs.get("cbsS3Endpoint")
+ bucketName = context.config.otherConfigs.get("cbsS3Bucket")
+ storagePrefix = context.config.otherConfigs.get("cbsS3Prefix")
+
+ client = initOssClient(ak, sk, endpoint)
+ for(String tabletId: tabletIds) {
+ storageSize += calculateFolderLength(client, bucketName,
storagePrefix + "/data/" + tabletId)
+ }
+ shutDownOssClient(client)
+ }
+
+ if (storageType.toLowerCase() == "hdfs") {
+ fsName = context.config.otherConfigs.get("cbsFsName")
+ isKerberosFs = context.config.otherConfigs.get("cbsFsKerberos")
+ fsUser = context.config.otherConfigs.get("cbsFsUser")
+ storagePrefix = context.config.otherConfigs.get("cbsFsPrefix")
+ }
+
+ return storageSize
+ }
+
+ def translate_different_unit_to_MB = { String size, String unitField ->
+ Double sizeKb = 0.0
+ if (unitField == "KB") {
+ sizeKb = Double.parseDouble(size) / 1024
+ } else if (unitField == "MB") {
+ sizeKb = Double.parseDouble(size)
+ } else if (unitField == "GB") {
+ sizeKb = Double.parseDouble(size) * 1024 * 1024
+ } else if (unitField == "TB") {
+ sizeKb = Double.parseDouble(size) * 1024 * 1024 * 1024
+ }
+ return sizeKb
+ }
+
+ def show_table_data_size_through_mysql = { String table ->
+ def mysqlShowDataSize = 0L
+ def res = sql_return_maparray " show data from ${table}"
+ def tableSizeInfo = res[0]
+ def fields = tableSizeInfo["Size"].split(" ")
+ if (fields.length == 2 ){
+ def sizeField = fields[0]
+ def unitField = fields[1]
+ mysqlShowDataSize = translate_different_unit_to_MB(sizeField,
unitField)
+ }
+ return mysqlShowDataSize
+ }
+
+ def caculate_table_data_size_through_api = { List<List<Object>> tablets ->
+ Double apiCaculateSize = 0
+ for (HashMap tablet in tablets) {
+ def tabletStatus = show_tablet_compaction(tablet)
+
+ for(String rowset: tabletStatus.rowsets){
+ def fields = rowset.split(" ")
+ if (fields.length == 7) {
+ def sizeField = fields[-2] // the last field(size)
+ def unitField = fields[-1] // The second to last
field(unit)
+ // 转换成 KB
+ apiCaculateSize +=
translate_different_unit_to_MB(sizeField, unitField )
+ }
+ }
+ }
+
+ return apiCaculateSize
+ }
+
+ def main = {
+ tableName="lineitem_mow"
+ sql "DROP TABLE IF EXISTS ${tableName};"
+ sql new File("""${context.file.parent}/ddl/${tableName}.sql""").text
+ sql new
File("""${context.file.parent}/ddl/lineitem_delete.sql""").text.replaceAll("\\\$\\{table\\}",
tableName)
+ List<String> tablets = get_tablets_from_table(tableName)
+ def loadTimes = [1, 10]
+ Map<String, List> sizeRecords = ["apiSize":[], "mysqlSize":[],
"cbsSize":[]]
+ for (int i in loadTimes){
+ // stream load 1 time, record each size
+ repeate_stream_load_same_data(tableName, i)
+ def rows = sql_return_maparray "select count(*) as count from
${tableName};"
+ logger.info("table ${tableName} has ${rows[0]["count"]} rows")
+ // 加一下触发compaction的机制
+ trigger_compaction(tablets)
+
+ // 然后 sleep 5min, 等fe汇报完
+ sleep(300 * 1000)
+
+
sizeRecords["apiSize"].add(caculate_table_data_size_through_api(tablets))
+
sizeRecords["cbsSize"].add(caculate_table_data_size_in_backend_storage(tablets))
+
sizeRecords["mysqlSize"].add(show_table_data_size_through_mysql(tableName))
+ sleep(300 * 1000)
+ logger.info("after ${i} times stream load, mysqlSize is:
${sizeRecords["mysqlSize"][-1]}, apiSize is: ${sizeRecords["apiSize"][-1]},
storageSize is: ${sizeRecords["cbsSize"][-1]}")
+
+ }
+
+ // expect mysqlSize == apiSize == storageSize
+ assertEquals(sizeRecords["mysqlSize"][0], sizeRecords["apiSize"][0])
+ assertEquals(sizeRecords["mysqlSize"][0], sizeRecords["cbsSize"][0])
+ // expect load 1 times == load 10 times
+ assertEquals(sizeRecords["mysqlSize"][0], sizeRecords["mysqlSize"][1])
+ assertEquals(sizeRecords["apiSize"][0], sizeRecords["apiSize"][1])
+ assertEquals(sizeRecords["cbsSize"][0], sizeRecords["cbsSize"][1])
+ }
+
+ main()
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]