This is an automated email from the ASF dual-hosted git repository.
jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f6ff621a416 [fix](inverted index) Modify Error Handling for File Open
Failure (#44551)
f6ff621a416 is described below
commit f6ff621a416224b705e35dc456c3d4a08df40fb2
Author: zzzxl <[email protected]>
AuthorDate: Wed Nov 27 15:59:10 2024 +0800
[fix](inverted index) Modify Error Handling for File Open Failure (#44551)
Problem Summary:
Modify Error Handling for File Open Failure
---
be/src/olap/compaction.cpp | 26 ++-
...st_skip_index_compaction_fault_injection.groovy | 240 +++++++++++++++++++++
2 files changed, 261 insertions(+), 5 deletions(-)
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 68ed0322a9e..e71e1862dc8 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -662,15 +662,28 @@ Status Compaction::do_inverted_index_compaction() {
try {
std::vector<std::unique_ptr<DorisCompoundReader>>
src_idx_dirs(src_segment_num);
for (int src_segment_id = 0; src_segment_id < src_segment_num;
src_segment_id++) {
- src_idx_dirs[src_segment_id] =
-
DORIS_TRY(inverted_index_file_readers[src_segment_id]->open(index_meta));
+ auto res =
inverted_index_file_readers[src_segment_id]->open(index_meta);
+ DBUG_EXECUTE_IF("Compaction::open_inverted_index_file_reader",
{
+ res =
ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
+ "debug point: Compaction::open_index_file_reader
error"));
+ })
+ if (!res.has_value()) {
+ throw
Exception(ErrorCode::INVERTED_INDEX_COMPACTION_ERROR, res.error().msg());
+ }
+ src_idx_dirs[src_segment_id] = std::move(res.value());
}
for (int dest_segment_id = 0; dest_segment_id < dest_segment_num;
dest_segment_id++) {
- auto dest_dir =
-
DORIS_TRY(inverted_index_file_writers[dest_segment_id]->open(index_meta));
+ auto res =
inverted_index_file_writers[dest_segment_id]->open(index_meta);
+ DBUG_EXECUTE_IF("Compaction::open_inverted_index_file_writer",
{
+ res =
ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
+ "debug point:
Compaction::open_inverted_index_file_writer error"));
+ })
+ if (!res.has_value()) {
+ throw
Exception(ErrorCode::INVERTED_INDEX_COMPACTION_ERROR, res.error().msg());
+ }
// Destination directories in dest_index_dirs do not need to
be deconstructed,
// but their lifecycle must be managed by
inverted_index_file_writers.
- dest_index_dirs[dest_segment_id] = dest_dir.get();
+ dest_index_dirs[dest_segment_id] = res.value().get();
}
auto st = compact_column(index_meta->index_id(), src_idx_dirs,
dest_index_dirs,
index_tmp_path.native(), trans_vec,
dest_segment_num_rows);
@@ -681,6 +694,9 @@ Status Compaction::do_inverted_index_compaction() {
} catch (CLuceneError& e) {
error_handler(index_meta->index_id(), column_uniq_id);
status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(e.what());
+ } catch (const Exception& e) {
+ error_handler(index_meta->index_id(), column_uniq_id);
+ status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(e.what());
}
}
diff --git
a/regression-test/suites/fault_injection_p0/test_skip_index_compaction_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_skip_index_compaction_fault_injection.groovy
new file mode 100644
index 00000000000..cdc2bdda417
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_skip_index_compaction_fault_injection.groovy
@@ -0,0 +1,240 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_skip_index_compaction_fault_injection", "nonConcurrent") {
+ def isCloudMode = isCloudMode()
+ def tableName1 = "test_skip_index_compaction_fault_injection_1"
+ def tableName2 = "test_skip_index_compaction_fault_injection_2"
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+ sql "DROP TABLE IF EXISTS ${tableName1}"
+ sql """
+ CREATE TABLE ${tableName1} (
+ `@timestamp` int(11) NULL COMMENT "",
+ `clientip` varchar(20) NULL COMMENT "",
+ `request` text NULL COMMENT "",
+ `status` int(11) NULL COMMENT "",
+ `size` int(11) NULL COMMENT "",
+ INDEX clientip_idx (`clientip`) USING INVERTED COMMENT '',
+ INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" =
"english", "support_phrase" = "true") COMMENT ''
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`@timestamp`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY RANDOM BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
+ "inverted_index_storage_format" = "V1"
+ );
+ """
+
+ sql "DROP TABLE IF EXISTS ${tableName2}"
+ sql """
+ CREATE TABLE ${tableName2} (
+ `@timestamp` int(11) NULL COMMENT "",
+ `clientip` varchar(20) NULL COMMENT "",
+ `request` text NULL COMMENT "",
+ `status` int(11) NULL COMMENT "",
+ `size` int(11) NULL COMMENT "",
+ INDEX clientip_idx (`clientip`) USING INVERTED COMMENT '',
+ INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" =
"english", "support_phrase" = "true") COMMENT ''
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`@timestamp`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY RANDOM BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
+ "inverted_index_storage_format" = "V2"
+ );
+ """
+
+ boolean disableAutoCompaction = false
+
+ def set_be_config = { key, value ->
+ for (String backend_id: backendId_to_backendIP.keySet()) {
+ def (code, out, err) =
update_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), key, value)
+ logger.info("update config: code=" + code + ", out=" + out + ", err=" +
err)
+ }
+ }
+
+ def trigger_full_compaction_on_tablets = { tablets ->
+ for (def tablet : tablets) {
+ String tablet_id = tablet.TabletId
+ String backend_id = tablet.BackendId
+ int times = 1
+
+ String compactionStatus;
+ do{
+ def (code, out, err) =
be_run_full_compaction(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Run compaction: code=" + code + ", out=" + out + ", err="
+ err)
+ ++times
+ sleep(2000)
+ compactionStatus = parseJson(out.trim()).status.toLowerCase();
+ } while (compactionStatus!="success" && times<=10 &&
compactionStatus!="e-6010")
+
+
+ if (compactionStatus == "fail") {
+ assertEquals(disableAutoCompaction, false)
+ logger.info("Compaction was done automatically!")
+ }
+ if (disableAutoCompaction && compactionStatus!="e-6010") {
+ assertEquals("success", compactionStatus)
+ }
+ }
+ }
+
+ def wait_full_compaction_done = { tablets ->
+ for (def tablet in tablets) {
+ boolean running = true
+ do {
+ Thread.sleep(1000)
+ String tablet_id = tablet.TabletId
+ String backend_id = tablet.BackendId
+ def (code, out, err) =
be_get_compaction_status(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Get compaction status: code=" + code + ", out=" + out +
", err=" + err)
+ assertEquals(code, 0)
+ def compactionStatus = parseJson(out.trim())
+ assertEquals("success", compactionStatus.status.toLowerCase())
+ running = compactionStatus.run_status
+ } while (running)
+ }
+ }
+
+ def get_rowset_count = { tablets ->
+ int rowsetCount = 0
+ for (def tablet in tablets) {
+ def (code, out, err) = curl("GET", tablet.CompactionStatus)
+ logger.info("Show tablets status: code=" + code + ", out=" + out + ",
err=" + err)
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ rowsetCount +=((List<String>) tabletJson.rowsets).size()
+ }
+ return rowsetCount
+ }
+
+ def check_config = { String key, String value ->
+ for (String backend_id: backendId_to_backendIP.keySet()) {
+ def (code, out, err) =
show_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id))
+ logger.info("Show config: code=" + code + ", out=" + out + ", err=" +
err)
+ assertEquals(code, 0)
+ def configList = parseJson(out.trim())
+ assert configList instanceof List
+ for (Object ele in (List) configList) {
+ assert ele instanceof List<String>
+ if (((List<String>) ele)[0] == key) {
+ assertEquals(value, ((List<String>) ele)[2])
+ }
+ }
+ }
+ }
+
+ def run_test = { tableName ->
+ sql """ INSERT INTO ${tableName} VALUES (1, "40.135.0.0", "GET
/images/hm_bg.jpg HTTP/1.0", 1, 2); """
+ sql """ INSERT INTO ${tableName} VALUES (2, "40.135.0.0", "GET
/images/hm_bg.jpg HTTP/1.0", 1, 2); """
+ sql """ INSERT INTO ${tableName} VALUES (3, "40.135.0.0", "GET
/images/hm_bg.jpg HTTP/1.0", 1, 2); """
+ sql """ INSERT INTO ${tableName} VALUES (4, "40.135.0.0", "GET
/images/hm_bg.jpg HTTP/1.0", 1, 2); """
+ sql """ INSERT INTO ${tableName} VALUES (5, "40.135.0.0", "GET
/images/hm_bg.jpg HTTP/1.0", 1, 2); """
+ sql """ INSERT INTO ${tableName} VALUES (6, "40.135.0.0", "GET
/images/hm_bg.jpg HTTP/1.0", 1, 2); """
+ sql """ INSERT INTO ${tableName} VALUES (7, "40.135.0.0", "GET
/images/hm_bg.jpg HTTP/1.0", 1, 2); """
+ sql """ INSERT INTO ${tableName} VALUES (8, "40.135.0.0", "GET
/images/hm_bg.jpg HTTP/1.0", 1, 2); """
+ sql """ INSERT INTO ${tableName} VALUES (9, "40.135.0.0", "GET
/images/hm_bg.jpg HTTP/1.0", 1, 2); """
+ sql """ INSERT INTO ${tableName} VALUES (10, "40.135.0.0", "GET
/images/hm_bg.jpg HTTP/1.0", 1, 2); """
+
+ sql "sync"
+
+ def tablets = sql_return_maparray """ show tablets from ${tableName}; """
+ logger.info("tablets: {}", tablets)
+
+ int replicaNum = 1
+ def dedup_tablets = deduplicate_tablets(tablets)
+ if (dedup_tablets.size() > 0) {
+ replicaNum = Math.round(tablets.size() / dedup_tablets.size())
+ if (replicaNum != 1 && replicaNum != 3) {
+ assert(false)
+ }
+ }
+
+ int rowsetCount = get_rowset_count.call(tablets);
+ assert (rowsetCount == 11 * replicaNum)
+
+ // first
+ trigger_full_compaction_on_tablets.call(tablets)
+ wait_full_compaction_done.call(tablets)
+
+ rowsetCount = get_rowset_count.call(tablets);
+ assert (rowsetCount == 11 * replicaNum)
+
+ // second
+ trigger_full_compaction_on_tablets.call(tablets)
+ wait_full_compaction_done.call(tablets)
+
+ rowsetCount = get_rowset_count.call(tablets);
+ if (isCloudMode) {
+ assert (rowsetCount == (1 + 1) * replicaNum)
+ } else {
+ assert (rowsetCount == 1 * replicaNum)
+ }
+ }
+
+ boolean invertedIndexCompactionEnable = false
+ boolean has_update_be_config = false
+ try {
+ String backend_id;
+ backend_id = backendId_to_backendIP.keySet()[0]
+ def (code, out, err) =
show_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id))
+
+ logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
+ assertEquals(code, 0)
+ def configList = parseJson(out.trim())
+ assert configList instanceof List
+
+ for (Object ele in (List) configList) {
+ assert ele instanceof List<String>
+ if (((List<String>) ele)[0] == "inverted_index_compaction_enable") {
+ invertedIndexCompactionEnable = Boolean.parseBoolean(((List<String>)
ele)[2])
+ logger.info("inverted_index_compaction_enable: ${((List<String>)
ele)[2]}")
+ }
+ }
+ set_be_config.call("inverted_index_compaction_enable", "true")
+ has_update_be_config = true
+ check_config.call("inverted_index_compaction_enable", "true");
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("Compaction::open_inverted_index_file_reader")
+ run_test.call(tableName1)
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("Compaction::open_inverted_index_file_reader")
+ }
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("Compaction::open_inverted_index_file_writer")
+ run_test.call(tableName2)
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("Compaction::open_inverted_index_file_writer")
+ }
+ } finally {
+ if (has_update_be_config) {
+ set_be_config.call("inverted_index_compaction_enable",
invertedIndexCompactionEnable.toString())
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]