This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9bcc5ce569d [Improvement]Add schema table backend_active_tasks (#31945)
9bcc5ce569d is described below
commit 9bcc5ce569df0fc49fae792facc82b8ceb6f0f83
Author: wangbo <[email protected]>
AuthorDate: Sat Mar 9 10:19:07 2024 +0800
[Improvement]Add schema table backend_active_tasks (#31945)
---
be/src/exec/schema_scanner.cpp | 3 +
.../schema_scanner/schema_backend_active_tasks.cpp | 94 ++++++++++++++++++++++
.../schema_scanner/schema_backend_active_tasks.h | 49 +++++++++++
be/src/runtime/runtime_query_statistics_mgr.cpp | 81 +++++++++----------
be/src/runtime/runtime_query_statistics_mgr.h | 8 +-
be/src/vec/exec/scan/vmeta_scanner.cpp | 22 +----
be/src/vec/exec/scan/vmeta_scanner.h | 1 -
.../org/apache/doris/analysis/SchemaTableType.java | 3 +-
.../doris/catalog/BuiltinTableValuedFunctions.java | 4 +-
.../java/org/apache/doris/catalog/SchemaTable.java | 14 ++++
.../expressions/functions/table/ActiveBeTasks.java | 58 -------------
.../visitor/TableValuedFunctionVisitor.java | 5 --
.../planner/BackendPartitionedSchemaScanNode.java | 12 ++-
.../ActiveBeTasksTableValuedFunction.java | 76 -----------------
.../doris/tablefunction/TableValuedFunctionIf.java | 2 -
.../doris/datasource/RefreshCatalogTest.java | 4 +-
gensrc/thrift/Descriptors.thrift | 3 +-
gensrc/thrift/Types.thrift | 3 +-
.../jdbc/test_mariadb_jdbc_catalog.out | 1 +
.../jdbc/test_mysql_jdbc_catalog.out | 1 +
.../jdbc/test_mysql_jdbc_catalog_nereids.out | 1 +
.../meta_scan/test_backend_active_tasks.groovy | 43 ++++++++++
22 files changed, 271 insertions(+), 217 deletions(-)
diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp
index b700d36f209..bff59130e8b 100644
--- a/be/src/exec/schema_scanner.cpp
+++ b/be/src/exec/schema_scanner.cpp
@@ -26,6 +26,7 @@
#include <ostream>
#include <utility>
+#include "exec/schema_scanner/schema_backend_active_tasks.h"
#include "exec/schema_scanner/schema_charsets_scanner.h"
#include "exec/schema_scanner/schema_collations_scanner.h"
#include "exec/schema_scanner/schema_columns_scanner.h"
@@ -149,6 +150,8 @@ std::unique_ptr<SchemaScanner>
SchemaScanner::create(TSchemaTableType::type type
return SchemaMetadataNameIdsScanner::create_unique();
case TSchemaTableType::SCH_PROFILING:
return SchemaProfilingScanner::create_unique();
+ case TSchemaTableType::SCH_BACKEND_ACTIVE_TASKS:
+ return SchemaBackendActiveTasksScanner::create_unique();
default:
return SchemaDummyScanner::create_unique();
break;
diff --git a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
new file mode 100644
index 00000000000..c5f8825c2e4
--- /dev/null
+++ b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/schema_scanner/schema_backend_active_tasks.h"
+
+#include "runtime/exec_env.h"
+#include "runtime/runtime_query_statistics_mgr.h"
+#include "runtime/runtime_state.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type_factory.hpp"
+
+namespace doris {
+std::vector<SchemaScanner::ColumnDesc>
SchemaBackendActiveTasksScanner::_s_tbls_columns = {
+ // name, type, size
+ {"BE_ID", TYPE_BIGINT, sizeof(StringRef), false},
+ {"FE_HOST", TYPE_VARCHAR, sizeof(StringRef), false},
+ {"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), false},
+ {"TASK_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false},
+ {"TASK_CPU_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false},
+ {"SCAN_ROWS", TYPE_BIGINT, sizeof(int64_t), false},
+ {"SCAN_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
+ {"BE_PEAK_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
+ {"CURRENT_USED_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
+ {"SHUFFLE_SEND_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
+ {"SHUFFLE_SEND_ROWS", TYPE_BIGINT, sizeof(int64_t), false},
+};
+
+SchemaBackendActiveTasksScanner::SchemaBackendActiveTasksScanner()
+ : SchemaScanner(_s_tbls_columns,
TSchemaTableType::SCH_BACKEND_ACTIVE_TASKS) {}
+
+SchemaBackendActiveTasksScanner::~SchemaBackendActiveTasksScanner() {}
+
+Status SchemaBackendActiveTasksScanner::start(RuntimeState* state) {
+ _block_rows_limit = state->batch_size();
+ return Status::OK();
+}
+
+Status SchemaBackendActiveTasksScanner::get_next_block(vectorized::Block*
block, bool* eos) {
+ if (!_is_init) {
+ return Status::InternalError("Used before initialized.");
+ }
+
+ if (nullptr == block || nullptr == eos) {
+ return Status::InternalError("input pointer is nullptr.");
+ }
+
+ if (_task_stats_block == nullptr) {
+ _task_stats_block = vectorized::Block::create_unique();
+
+ for (int i = 0; i < _s_tbls_columns.size(); ++i) {
+ TypeDescriptor descriptor(_s_tbls_columns[i].type);
+ auto data_type =
+
vectorized::DataTypeFactory::instance().create_data_type(descriptor, true);
+ _task_stats_block->insert(vectorized::ColumnWithTypeAndName(
+ data_type->create_column(), data_type,
_s_tbls_columns[i].name));
+ }
+
+ _task_stats_block->reserve(_block_rows_limit);
+
+
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_active_be_tasks_block(
+ _task_stats_block.get());
+ _total_rows = _task_stats_block->rows();
+ }
+
+ if (_row_idx == _total_rows) {
+ *eos = true;
+ return Status::OK();
+ }
+
+ int current_batch_rows = std::min(_block_rows_limit, _total_rows -
_row_idx);
+ vectorized::MutableBlock mblock =
vectorized::MutableBlock::build_mutable_block(block);
+ mblock.add_rows(_task_stats_block.get(), _row_idx, current_batch_rows);
+ _row_idx += current_batch_rows;
+
+ *eos = _row_idx == _total_rows;
+ return Status::OK();
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/exec/schema_scanner/schema_backend_active_tasks.h
b/be/src/exec/schema_scanner/schema_backend_active_tasks.h
new file mode 100644
index 00000000000..d8a2a1ffa3f
--- /dev/null
+++ b/be/src/exec/schema_scanner/schema_backend_active_tasks.h
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <vector>
+
+#include "common/status.h"
+#include "exec/schema_scanner.h"
+
+namespace doris {
+class RuntimeState;
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
+class SchemaBackendActiveTasksScanner : public SchemaScanner {
+ ENABLE_FACTORY_CREATOR(SchemaBackendActiveTasksScanner);
+
+public:
+ SchemaBackendActiveTasksScanner();
+ ~SchemaBackendActiveTasksScanner() override;
+
+ Status start(RuntimeState* state) override;
+ Status get_next_block(vectorized::Block* block, bool* eos) override;
+
+ static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
+
+private:
+ int _block_rows_limit = 4096;
+ int _row_idx = 0;
+ int _total_rows = 0;
+ std::unique_ptr<vectorized::Block> _task_stats_block = nullptr;
+};
+}; // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index ee09b0c30dc..9764b0f0507 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -21,6 +21,7 @@
#include "runtime/exec_env.h"
#include "util/debug_util.h"
#include "util/time.h"
+#include "vec/core/block.h"
namespace doris {
@@ -199,54 +200,52 @@ void
RuntimeQueryStatiticsMgr::set_workload_group_id(std::string query_id, int64
}
}
-std::vector<TRow> RuntimeQueryStatiticsMgr::get_active_be_tasks_statistics(
- std::vector<std::string> filter_columns) {
+void RuntimeQueryStatiticsMgr::get_active_be_tasks_block(vectorized::Block*
block) {
std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
- std::vector<TRow> table_rows;
int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id;
+ auto insert_int_value = [&](int col_index, int64_t int_val,
vectorized::Block* block) {
+ vectorized::MutableColumnPtr mutable_col_ptr;
+ mutable_col_ptr =
std::move(*block->get_by_position(col_index).column).assume_mutable();
+ auto* nullable_column =
+
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
+ vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
+
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(
+ int_val);
+ nullable_column->get_null_map_data().emplace_back(0);
+ };
+
+ auto insert_string_value = [&](int col_index, std::string str_val,
vectorized::Block* block) {
+ vectorized::MutableColumnPtr mutable_col_ptr;
+ mutable_col_ptr =
std::move(*block->get_by_position(col_index).column).assume_mutable();
+ auto* nullable_column =
+
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
+ vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
+
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_val.data(),
+
str_val.size());
+ nullable_column->get_null_map_data().emplace_back(0);
+ };
+
+ // block's schema come from
SchemaBackendActiveTasksScanner::_s_tbls_columns
for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) {
- TRow trow;
-
TQueryStatistics tqs;
qs_ctx_ptr->collect_query_statistics(&tqs);
-
- for (auto iter = filter_columns.begin(); iter != filter_columns.end();
iter++) {
- std::string col_name = *iter;
-
- TCell tcell;
- if (col_name == "beid") {
- tcell.longVal = be_id;
- } else if (col_name == "fehost") {
- tcell.stringVal = qs_ctx_ptr->_fe_addr.hostname;
- } else if (col_name == "queryid") {
- tcell.stringVal = query_id;
- } else if (col_name == "tasktimems") {
- if (qs_ctx_ptr->_is_query_finished) {
- tcell.longVal = qs_ctx_ptr->_query_finish_time -
qs_ctx_ptr->_query_start_time;
- } else {
- tcell.longVal = MonotonicMillis() -
qs_ctx_ptr->_query_start_time;
- }
- } else if (col_name == "taskcputimems") {
- tcell.longVal = tqs.cpu_ms;
- } else if (col_name == "scanrows") {
- tcell.longVal = tqs.scan_rows;
- } else if (col_name == "scanbytes") {
- tcell.longVal = tqs.scan_bytes;
- } else if (col_name == "bepeakmemorybytes") {
- tcell.longVal = tqs.max_peak_memory_bytes;
- } else if (col_name == "currentusedmemorybytes") {
- tcell.longVal = tqs.current_used_memory_bytes;
- } else if (col_name == "shufflesendbytes") {
- tcell.longVal = tqs.shuffle_send_bytes;
- } else if (col_name == "shufflesendRows") {
- tcell.longVal = tqs.shuffle_send_rows;
- }
- trow.column_value.push_back(tcell);
- }
- table_rows.push_back(trow);
+ insert_int_value(0, be_id, block);
+ insert_string_value(1, qs_ctx_ptr->_fe_addr.hostname, block);
+ insert_string_value(2, query_id, block);
+
+ int64_t task_time = qs_ctx_ptr->_is_query_finished
+ ? qs_ctx_ptr->_query_finish_time -
qs_ctx_ptr->_query_start_time
+ : MonotonicMillis() -
qs_ctx_ptr->_query_start_time;
+ insert_int_value(3, task_time, block);
+ insert_int_value(4, tqs.cpu_ms, block);
+ insert_int_value(5, tqs.scan_rows, block);
+ insert_int_value(6, tqs.scan_bytes, block);
+ insert_int_value(7, tqs.max_peak_memory_bytes, block);
+ insert_int_value(8, tqs.current_used_memory_bytes, block);
+ insert_int_value(9, tqs.shuffle_send_bytes, block);
+ insert_int_value(10, tqs.shuffle_send_rows, block);
}
- return table_rows;
}
} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/runtime_query_statistics_mgr.h
b/be/src/runtime/runtime_query_statistics_mgr.h
index 44badd196a3..1b3e164d48f 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.h
+++ b/be/src/runtime/runtime_query_statistics_mgr.h
@@ -28,6 +28,10 @@
namespace doris {
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
class QueryStatisticsCtx {
public:
QueryStatisticsCtx(TNetworkAddress fe_addr) : _fe_addr(fe_addr) {
@@ -68,8 +72,8 @@ public:
void get_metric_map(std::string query_id,
std::map<WorkloadMetricType, std::string>& metric_map);
- // used for tvf active_queries
- std::vector<TRow> get_active_be_tasks_statistics(std::vector<std::string>
filter_columns);
+ // used for backend_active_tasks
+ void get_active_be_tasks_block(vectorized::Block* block);
private:
std::shared_mutex _qs_ctx_map_lock;
diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp
b/be/src/vec/exec/scan/vmeta_scanner.cpp
index 7d438366cf2..22545fa4dce 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.cpp
+++ b/be/src/vec/exec/scan/vmeta_scanner.cpp
@@ -34,7 +34,6 @@
#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
-#include "runtime/runtime_query_statistics_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "util/thrift_rpc_helper.h"
@@ -96,12 +95,7 @@ Status VMetaScanner::prepare(RuntimeState* state, const
VExprContextSPtrs& conju
return Status::InternalError("Logical error, VMetaScanner do not allow
ColumnNullable");
}
- if (_scan_range.meta_scan_range.metadata_type ==
TMetadataType::ACTIVE_BE_TASKS) {
- // tvf active_be_tasks fetch data in be directly, it does not need to
request FE for data
- RETURN_IF_ERROR(_build_active_be_tasks_data());
- } else {
- RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range));
- }
+ RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range));
return Status::OK();
}
@@ -294,20 +288,6 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange&
meta_scan_range) {
return Status::OK();
}
-Status VMetaScanner::_build_active_be_tasks_data() {
- std::vector<std::string> filter_columns;
- for (const auto& slot : _tuple_desc->slots()) {
- filter_columns.emplace_back(slot->col_name_lower_case());
- }
-
- std::vector<TRow> ret =
-
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_active_be_tasks_statistics(
- filter_columns);
- _batch_data = std::move(ret);
-
- return Status::OK();
-}
-
Status VMetaScanner::_build_iceberg_metadata_request(const TMetaScanRange&
meta_scan_range,
TFetchSchemaTableDataRequest* request) {
VLOG_CRITICAL << "VMetaScanner::_build_iceberg_metadata_request";
diff --git a/be/src/vec/exec/scan/vmeta_scanner.h
b/be/src/vec/exec/scan/vmeta_scanner.h
index 25cb9345311..518f42ffc1c 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.h
+++ b/be/src/vec/exec/scan/vmeta_scanner.h
@@ -91,7 +91,6 @@ private:
TFetchSchemaTableDataRequest*
request);
Status _build_queries_metadata_request(const TMetaScanRange&
meta_scan_range,
TFetchSchemaTableDataRequest*
request);
- Status _build_active_be_tasks_data();
bool _meta_eos;
TupleId _tuple_id;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
index 59bae154d57..7a7d547e240 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
@@ -69,7 +69,8 @@ public enum SchemaTableType {
SCH_ROWSETS("ROWSETS", "ROWSETS", TSchemaTableType.SCH_ROWSETS),
SCH_PARAMETERS("PARAMETERS", "PARAMETERS",
TSchemaTableType.SCH_PARAMETERS),
SCH_METADATA_NAME_IDS("METADATA_NAME_IDS", "METADATA_NAME_IDS",
TSchemaTableType.SCH_METADATA_NAME_IDS),
- SCH_PROFILING("PROFILING", "PROFILING", TSchemaTableType.SCH_PROFILING);
+ SCH_PROFILING("PROFILING", "PROFILING", TSchemaTableType.SCH_PROFILING),
+ SCH_BACKEND_ACTIVE_TASKS("BACKEND_ACTIVE_TASKS", "BACKEND_ACTIVE_TASKS",
TSchemaTableType.SCH_BACKEND_ACTIVE_TASKS);
private static final String dbName = "INFORMATION_SCHEMA";
private static SelectList fullSelectLists;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
index acdeb683f26..ac1b31fceac 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
@@ -17,7 +17,6 @@
package org.apache.doris.catalog;
-import
org.apache.doris.nereids.trees.expressions.functions.table.ActiveBeTasks;
import
org.apache.doris.nereids.trees.expressions.functions.table.ActiveQueries;
import org.apache.doris.nereids.trees.expressions.functions.table.Backends;
import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs;
@@ -60,8 +59,7 @@ public class BuiltinTableValuedFunctions implements
FunctionHelper {
tableValued(MvInfos.class, "mv_infos"),
tableValued(Jobs.class, "jobs"),
tableValued(Tasks.class, "tasks"),
- tableValued(WorkloadGroups.class, "workload_groups"),
- tableValued(ActiveBeTasks.class, "active_be_tasks")
+ tableValued(WorkloadGroups.class, "workload_groups")
);
public static final BuiltinTableValuedFunctions INSTANCE = new
BuiltinTableValuedFunctions();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 7215cf0fc76..9e068418610 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -444,6 +444,20 @@ public class SchemaTable extends Table {
.column("SOURCE_FILE",
ScalarType.createVarchar(20))
.column("SOURCE_LINE",
ScalarType.createType(PrimitiveType.INT))
.build()))
+ .put("backend_active_tasks",
+ new SchemaTable(SystemIdGenerator.getNextId(),
"backend_active_tasks", TableType.SCHEMA,
+ builder().column("BE_ID",
ScalarType.createType(PrimitiveType.BIGINT))
+ .column("FE_HOST",
ScalarType.createVarchar(256))
+ .column("QUERY_ID",
ScalarType.createVarchar(256))
+ .column("TASK_TIME_MS",
ScalarType.createType(PrimitiveType.BIGINT))
+ .column("TASK_CPU_TIME_MS",
ScalarType.createType(PrimitiveType.BIGINT))
+ .column("SCAN_ROWS",
ScalarType.createType(PrimitiveType.BIGINT))
+ .column("SCAN_BYTES",
ScalarType.createType(PrimitiveType.BIGINT))
+ .column("BE_PEAK_MEMORY_BYTES",
ScalarType.createType(PrimitiveType.BIGINT))
+ .column("CURRENT_USED_MEMORY_BYTES",
ScalarType.createType(PrimitiveType.BIGINT))
+ .column("SHUFFLE_SEND_BYTES",
ScalarType.createType(PrimitiveType.BIGINT))
+ .column("SHUFFLE_SEND_ROWS",
ScalarType.createType(PrimitiveType.BIGINT))
+ .build()))
.build();
protected SchemaTable(long id, String name, TableType type, List<Column>
baseSchema) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveBeTasks.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveBeTasks.java
deleted file mode 100644
index 5737f52a2b9..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveBeTasks.java
+++ /dev/null
@@ -1,58 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.nereids.trees.expressions.functions.table;
-
-import org.apache.doris.catalog.FunctionSignature;
-import org.apache.doris.nereids.exceptions.AnalysisException;
-import org.apache.doris.nereids.trees.expressions.Properties;
-import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
-import org.apache.doris.nereids.types.coercion.AnyDataType;
-import org.apache.doris.tablefunction.ActiveBeTasksTableValuedFunction;
-import org.apache.doris.tablefunction.TableValuedFunctionIf;
-
-import java.util.Map;
-
-/**
- * stands be running tasks status, currently main including
select/streamload/broker load/insert select
- */
-public class ActiveBeTasks extends TableValuedFunction {
-
- public ActiveBeTasks(Properties properties) {
- super("active_be_tasks", properties);
- }
-
- @Override
- public FunctionSignature customSignature() {
- return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX,
getArgumentsTypes());
- }
-
- @Override
- protected TableValuedFunctionIf toCatalogFunction() {
- try {
- Map<String, String> arguments = getTVFProperties().getMap();
- return new ActiveBeTasksTableValuedFunction(arguments);
- } catch (Throwable t) {
- throw new AnalysisException("Can not build
ActiveBeTasksTableValuedFunction by "
- + this + ": " + t.getMessage(), t);
- }
- }
-
- public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
- return visitor.visitActiveBeTasks(this, context);
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
index 36561e5b12c..fba34d48168 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
@@ -17,7 +17,6 @@
package org.apache.doris.nereids.trees.expressions.visitor;
-import
org.apache.doris.nereids.trees.expressions.functions.table.ActiveBeTasks;
import
org.apache.doris.nereids.trees.expressions.functions.table.ActiveQueries;
import org.apache.doris.nereids.trees.expressions.functions.table.Backends;
import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs;
@@ -103,8 +102,4 @@ public interface TableValuedFunctionVisitor<R, C> {
default R visitWorkloadGroups(WorkloadGroups workloadGroups, C context) {
return visitTableValuedFunction(workloadGroups, context);
}
-
- default R visitActiveBeTasks(ActiveBeTasks beTasks, C context) {
- return visitTableValuedFunction(beTasks, context);
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java
index 592fc3c96cf..dc57b67d98a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java
@@ -41,8 +41,10 @@ import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* The BackendSchemaScanNode used for those SchemaTable which data are need to
acquire from backends.
@@ -51,10 +53,16 @@ import java.util.Map;
* So, we can use partitionInfo to select the necessary `be` to send query.
*/
public class BackendPartitionedSchemaScanNode extends SchemaScanNode {
- public static final String ROWSETS = "rowsets";
+
+ public static final Set<String> BACKEND_TABLE = new HashSet<>();
+
+ static {
+ BACKEND_TABLE.add("rowsets");
+ BACKEND_TABLE.add("backend_active_tasks");
+ }
public static boolean isBackendPartitionedSchemaTable(String tableName) {
- if (tableName.equalsIgnoreCase(ROWSETS)) {
+ if (BACKEND_TABLE.contains(tableName.toLowerCase())) {
return true;
}
return false;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveBeTasksTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveBeTasksTableValuedFunction.java
deleted file mode 100644
index 99a8ba4886f..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveBeTasksTableValuedFunction.java
+++ /dev/null
@@ -1,76 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.tablefunction;
-
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.catalog.ScalarType;
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.thrift.TMetaScanRange;
-import org.apache.doris.thrift.TMetadataType;
-
-import com.google.common.collect.ImmutableList;
-
-import java.util.List;
-import java.util.Map;
-
-public class ActiveBeTasksTableValuedFunction extends
MetadataTableValuedFunction {
-
- public static final String NAME = "active_be_tasks";
-
- private static final ImmutableList<Column> SCHEMA = ImmutableList.of(
- new Column("BeId", PrimitiveType.BIGINT),
- new Column("FeHost", ScalarType.createStringType()),
- new Column("QueryId", ScalarType.createStringType()),
- new Column("TaskTimeMs", PrimitiveType.BIGINT),
- new Column("TaskCpuTimeMs", PrimitiveType.BIGINT),
- new Column("ScanRows", PrimitiveType.BIGINT),
- new Column("ScanBytes", PrimitiveType.BIGINT),
- new Column("BePeakMemoryBytes", PrimitiveType.BIGINT),
- new Column("CurrentUsedMemoryBytes", PrimitiveType.BIGINT),
- new Column("ShuffleSendBytes", PrimitiveType.BIGINT),
- new Column("ShuffleSendRows", PrimitiveType.BIGINT));
-
- public ActiveBeTasksTableValuedFunction(Map<String, String> params) throws
AnalysisException {
- if (params.size() != 0) {
- throw new AnalysisException("ActiveBeTasks table-valued-function
does not support any params");
- }
- }
-
- @Override
- public TMetadataType getMetadataType() {
- return TMetadataType.ACTIVE_BE_TASKS;
- }
-
- @Override
- public TMetaScanRange getMetaScanRange() {
- TMetaScanRange metaScanRange = new TMetaScanRange();
- metaScanRange.setMetadataType(TMetadataType.ACTIVE_BE_TASKS);
- return metaScanRange;
- }
-
- @Override
- public String getTableName() {
- return "ActiveBeTasksTableValuedFunction";
- }
-
- @Override
- public List<Column> getTableColumns() throws AnalysisException {
- return SCHEMA;
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
index 4b755a97bf8..f9fb76a9666 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
@@ -78,8 +78,6 @@ public abstract class TableValuedFunctionIf {
return new ActiveQueriesTableValuedFunction(params);
case WorkloadSchedPolicyTableValuedFunction.NAME:
return new WorkloadSchedPolicyTableValuedFunction(params);
- case ActiveBeTasksTableValuedFunction.NAME:
- return new ActiveBeTasksTableValuedFunction(params);
default:
throw new AnalysisException("Could not find table function " +
funcName);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java
index f5244005c16..5b4c15e876e 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java
@@ -87,7 +87,7 @@ public class RefreshCatalogTest extends TestWithFeService {
List<String> dbNames2 = test1.getDbNames();
Assertions.assertEquals(4, dbNames2.size());
ExternalInfoSchemaDatabase infoDb = (ExternalInfoSchemaDatabase)
test1.getDb(InfoSchemaDb.DATABASE_NAME).get();
- Assertions.assertEquals(27, infoDb.getTables().size());
+ Assertions.assertEquals(28, infoDb.getTables().size());
TestExternalDatabase testDb = (TestExternalDatabase)
test1.getDb("db1").get();
Assertions.assertEquals(2, testDb.getTables().size());
@@ -96,7 +96,7 @@ public class RefreshCatalogTest extends TestWithFeService {
CatalogMgr mgr2 = GsonUtils.GSON.fromJson(json, CatalogMgr.class);
test1 = mgr2.getCatalog("test1");
infoDb = (ExternalInfoSchemaDatabase)
test1.getDb(InfoSchemaDb.DATABASE_NAME).get();
- Assertions.assertEquals(27, infoDb.getTables().size());
+ Assertions.assertEquals(28, infoDb.getTables().size());
testDb = (TestExternalDatabase) test1.getDb("db1").get();
Assertions.assertEquals(2, testDb.getTables().size());
}
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index 5e77729becd..4e038aecb89 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -125,7 +125,8 @@ enum TSchemaTableType {
SCH_COLUMN_STATISTICS,
SCH_PARAMETERS,
SCH_METADATA_NAME_IDS,
- SCH_PROFILING;
+ SCH_PROFILING,
+ SCH_BACKEND_ACTIVE_TASKS;
}
enum THdfsCompression {
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 04a1fd35163..2cd6af9f050 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -715,8 +715,7 @@ enum TMetadataType {
JOBS,
TASKS,
QUERIES,
- WORKLOAD_SCHED_POLICY,
- ACTIVE_BE_TASKS,
+ WORKLOAD_SCHED_POLICY
}
enum TIcebergQueryType {
diff --git
a/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out
b/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out
index 0ad76d0db4c..ff09873c18c 100644
--- a/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out
+++ b/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out
@@ -23,6 +23,7 @@ mariadb_jdbc_catalog
115 abg
-- !information_schema --
+backend_active_tasks
character_sets
collations
column_privileges
diff --git
a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out
b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out
index 3520a11d8bc..309c753f5d6 100644
--- a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out
+++ b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out
@@ -192,6 +192,7 @@ bca 2022-11-02 2022-11-02 8012 vivo
2 2
-- !information_schema --
+backend_active_tasks
character_sets
collations
column_privileges
diff --git
a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out
b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out
index a66c418fede..fcbf4f99244 100644
---
a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out
+++
b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out
@@ -160,6 +160,7 @@ bca 2022-11-02 2022-11-02 8012 vivo
123456789012345678901234567890123.12345
12345678901234567890123456789012.12345
1234567890123456789012345678901234.12345
123456789012345678901234567890123.12345
123456789012345678901234567890123456789012345678901234567890.12345
123456789012345678901234567890123456789012345678901234567890.12345
-- !information_schema --
+backend_active_tasks
character_sets
collations
column_privileges
diff --git
a/regression-test/suites/query_p0/meta_scan/test_backend_active_tasks.groovy
b/regression-test/suites/query_p0/meta_scan/test_backend_active_tasks.groovy
new file mode 100644
index 00000000000..172a7bbf850
--- /dev/null
+++ b/regression-test/suites/query_p0/meta_scan/test_backend_active_tasks.groovy
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_backend_active_tasks") {
+ def thread1 = new Thread({
+ while(true) {
+ // non-pipeline
+ sql "set experimental_enable_pipeline_engine=false"
+ sql "set experimental_enable_pipeline_x_engine=false"
+ sql "select * from information_schema.backend_active_tasks"
+ sql "select BE_ID,FE_HOST,QUERY_ID,SCAN_ROWS from
information_schema.backend_active_tasks"
+
+ // pipeline
+ sql "set experimental_enable_pipeline_engine=true"
+ sql "set experimental_enable_pipeline_x_engine=false"
+ sql "select * from information_schema.backend_active_tasks"
+ sql "select BE_ID,FE_HOST,QUERY_ID,SCAN_ROWS from
information_schema.backend_active_tasks"
+
+ // pipelinex
+ sql "set experimental_enable_pipeline_engine=true"
+ sql "set experimental_enable_pipeline_x_engine=true"
+ sql "select * from information_schema.backend_active_tasks"
+ sql "select BE_ID,FE_HOST,QUERY_ID,SCAN_ROWS from
information_schema.backend_active_tasks"
+ Thread.sleep(1000)
+ }
+ })
+ thread1.setDaemon(true)
+ thread1.start()
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]