This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 3c79c56e29a [bugfix](schema_change) Fix the coredump when doubly write
during schema change #22557 (#25783)
3c79c56e29a is described below
commit 3c79c56e29a4a4ab2a94066574b6c1a462aa8e8e
Author: Lightman <[email protected]>
AuthorDate: Tue Oct 24 14:01:53 2023 +0800
[bugfix](schema_change) Fix the coredump when doubly write during schema
change #22557 (#25783)
---
be/src/exec/tablet_info.cpp | 83 +++++++++++-------
be/src/olap/tablet_schema.cpp | 3 +
be/src/olap/tablet_schema.h | 4 +
be/src/runtime/descriptors.cpp | 4 +
be/src/runtime/descriptors.h | 3 +
be/src/runtime/primitive_type.cpp | 8 ++
.../org/apache/doris/analysis/SlotDescriptor.java | 5 +-
.../main/java/org/apache/doris/catalog/Column.java | 2 +-
gensrc/proto/descriptors.proto | 2 +
gensrc/thrift/Descriptors.thrift | 1 +
.../suites/schema_change/ddl/lineorder_create.sql | 24 ++++++
.../suites/schema_change/ddl/lineorder_delete.sql | 1 +
.../test_double_write_when_schema_change.groovy | 99 ++++++++++++++++++++++
13 files changed, 204 insertions(+), 35 deletions(-)
diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp
index 34bd9053b43..26a39e5ac55 100644
--- a/be/src/exec/tablet_info.cpp
+++ b/be/src/exec/tablet_info.cpp
@@ -29,9 +29,11 @@
#include <ostream>
#include "olap/tablet_schema.h"
+#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
#include "runtime/large_int_value.h"
#include "runtime/memory/mem_tracker.h"
+#include "runtime/primitive_type.h"
#include "runtime/raw_value.h"
#include "runtime/types.h"
#include "util/hash_util.hpp"
@@ -67,30 +69,39 @@ Status OlapTableSchemaParam::init(const
POlapTableSchemaParam& pschema) {
for (auto& col : pschema.partial_update_input_columns()) {
_partial_update_input_columns.insert(col);
}
- std::map<std::string, SlotDescriptor*> slots_map;
+ std::unordered_map<std::pair<std::string, std::string>, SlotDescriptor*>
slots_map;
_tuple_desc = _obj_pool.add(new TupleDescriptor(pschema.tuple_desc()));
-
+ // When FE version is less than 2.0.3, But BE upgrade to 2.0.3,
+ // the filed col_type in slot_desc is INVALID_TYPE default.
+ // It can be remove later.
+ bool has_invalid_type = false;
for (auto& p_slot_desc : pschema.slot_descs()) {
auto slot_desc = _obj_pool.add(new SlotDescriptor(p_slot_desc));
+ if (slot_desc->col_type() == INVALID_TYPE) has_invalid_type = true;
_tuple_desc->add_slot(slot_desc);
- slots_map.emplace(slot_desc->col_name(), slot_desc);
+ string data_type;
+ EnumToString(TPrimitiveType, to_thrift(slot_desc->col_type()),
data_type);
+ LOG(INFO) << "lightman " << data_type;
+ slots_map.emplace(std::make_pair(to_lower(slot_desc->col_name()),
std::move(data_type)),
+ slot_desc);
}
for (auto& p_index : pschema.indexes()) {
auto index = _obj_pool.add(new OlapTableIndexSchema());
index->index_id = p_index.id();
index->schema_hash = p_index.schema_hash();
- for (auto& col : p_index.columns()) {
- if (_is_partial_update && _partial_update_input_columns.count(col)
== 0) {
- continue;
- }
- auto it = slots_map.find(col);
- if (it == std::end(slots_map)) {
- return Status::InternalError("unknown index column,
column={}", col);
- }
- index->slots.emplace_back(it->second);
- }
for (auto& pcolumn_desc : p_index.columns_desc()) {
+ if (!_is_partial_update ||
+ _partial_update_input_columns.count(pcolumn_desc.name()) > 0) {
+ std::string col_type = has_invalid_type ? "INVALID_TYPE" :
pcolumn_desc.type();
+ auto it = slots_map.find(
+ std::make_pair(to_lower(pcolumn_desc.name()),
col_type));
+ if (it == std::end(slots_map)) {
+ return Status::InternalError("unknown index column,
column={}, type={}",
+ pcolumn_desc.name(),
pcolumn_desc.type());
+ }
+ index->slots.emplace_back(it->second);
+ }
TabletColumn* tc = _obj_pool.add(new TabletColumn());
tc->init_from_pb(pcolumn_desc);
index->columns.emplace_back(tc);
@@ -123,41 +134,49 @@ Status OlapTableSchemaParam::init(const
TOlapTableSchemaParam& tschema) {
for (auto& tcolumn : tschema.partial_update_input_columns) {
_partial_update_input_columns.insert(tcolumn);
}
- std::map<std::string, SlotDescriptor*> slots_map;
+ std::unordered_map<std::pair<std::string, PrimitiveType>, SlotDescriptor*>
slots_map;
_tuple_desc = _obj_pool.add(new TupleDescriptor(tschema.tuple_desc));
+ // When FE version is less than 2.0.3, But BE upgrade to 2.0.3,
+ // the filed col_type in slot_desc is INVALID_TYPE default.
+ // It can be remove later.
+ bool has_invalid_type = false;
for (auto& t_slot_desc : tschema.slot_descs) {
auto slot_desc = _obj_pool.add(new SlotDescriptor(t_slot_desc));
+ if (slot_desc->col_type() == INVALID_TYPE) has_invalid_type = true;
_tuple_desc->add_slot(slot_desc);
- slots_map.emplace(to_lower(slot_desc->col_name()), slot_desc);
+ slots_map.emplace(std::make_pair(to_lower(slot_desc->col_name()),
slot_desc->col_type()),
+ slot_desc);
}
for (auto& t_index : tschema.indexes) {
+ std::unordered_map<std::string, SlotDescriptor*> index_slots_map;
auto index = _obj_pool.add(new OlapTableIndexSchema());
index->index_id = t_index.id;
index->schema_hash = t_index.schema_hash;
- for (auto& col : t_index.columns) {
- if (_is_partial_update && _partial_update_input_columns.count(col)
== 0) {
- continue;
- }
- auto it = slots_map.find(to_lower(col));
- if (it == std::end(slots_map)) {
- return Status::InternalError("unknown index column,
column={}", col);
- }
- index->slots.emplace_back(it->second);
- }
- if (t_index.__isset.columns_desc) {
- for (auto& tcolumn_desc : t_index.columns_desc) {
- TabletColumn* tc = _obj_pool.add(new TabletColumn());
- tc->init_from_thrift(tcolumn_desc);
- index->columns.emplace_back(tc);
+ for (auto& tcolumn_desc : t_index.columns_desc) {
+ TPrimitiveType::type col_type = has_invalid_type ?
TPrimitiveType::INVALID_TYPE : tcolumn_desc.column_type.type;
+ auto it =
slots_map.find(std::make_pair(to_lower(tcolumn_desc.column_name),
+ thrift_to_type(col_type)));
+ if (!_is_partial_update ||
+ _partial_update_input_columns.count(tcolumn_desc.column_name)
> 0) {
+ if (it == slots_map.end()) {
+ return Status::InternalError("unknown index column,
column={}, type={}",
+ tcolumn_desc.column_name,
+
tcolumn_desc.column_type.type);
+ }
+ index_slots_map.emplace(to_lower(tcolumn_desc.column_name),
it->second);
+ index->slots.emplace_back(it->second);
}
+ TabletColumn* tc = _obj_pool.add(new TabletColumn());
+ tc->init_from_thrift(tcolumn_desc);
+ index->columns.emplace_back(tc);
}
if (t_index.__isset.indexes_desc) {
for (auto& tindex_desc : t_index.indexes_desc) {
std::vector<int32_t>
column_unique_ids(tindex_desc.columns.size());
for (size_t i = 0; i < tindex_desc.columns.size(); i++) {
- auto it = slots_map.find(to_lower(tindex_desc.columns[i]));
- if (it != std::end(slots_map)) {
+ auto it =
index_slots_map.find(to_lower(tindex_desc.columns[i]));
+ if (it != index_slots_map.end()) {
column_unique_ids[i] = it->second->col_unique_id();
}
}
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index 9dc1bc20b9f..fefd291e3f7 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -758,6 +758,9 @@ void TabletSchema::build_current_tablet_schema(int64_t
index_id, int32_t version
_indexes.clear();
_field_name_to_index.clear();
_field_id_to_index.clear();
+ _delete_sign_idx = -1;
+ _sequence_col_idx = -1;
+ _version_col_idx = -1;
for (auto& column : index->columns) {
if (column->is_key()) {
diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h
index af5d860d19a..abc75fe7a5b 100644
--- a/be/src/olap/tablet_schema.h
+++ b/be/src/olap/tablet_schema.h
@@ -121,6 +121,10 @@ public:
private:
int32_t _unique_id;
std::string _col_name;
+ // the field _type will change from TPrimitiveType
+ // to string by 'EnumToString(TPrimitiveType, tcolumn.column_type.type,
data_type);' (reference: TabletMeta::init_column_from_tcolumn)
+ // to FieldType by 'TabletColumn::get_field_type_by_string' (reference:
TabletColumn::init_from_pb).
+ // And the _type in columnPB is string and it changed from FieldType by
'get_string_by_field_type' (reference: TabletColumn::to_schema_pb).
FieldType _type;
bool _is_key = false;
FieldAggregationMethod _aggregation;
diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp
index 43b99fa5aa1..15a4b773264 100644
--- a/be/src/runtime/descriptors.cpp
+++ b/be/src/runtime/descriptors.cpp
@@ -31,6 +31,7 @@
#include <memory>
#include "common/object_pool.h"
+#include "runtime/primitive_type.h"
#include "util/string_util.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/data_types/data_type_factory.hpp"
@@ -59,6 +60,7 @@ SlotDescriptor::SlotDescriptor(const TSlotDescriptor& tdesc)
_col_name(tdesc.colName),
_col_name_lower_case(to_lower(tdesc.colName)),
_col_unique_id(tdesc.col_unique_id),
+ _col_type(thrift_to_type(tdesc.primitive_type)),
_slot_idx(tdesc.slotIdx),
_field_idx(-1),
_is_materialized(tdesc.isMaterialized),
@@ -76,6 +78,7 @@ SlotDescriptor::SlotDescriptor(const PSlotDescriptor& pdesc)
_col_name(pdesc.col_name()),
_col_name_lower_case(to_lower(pdesc.col_name())),
_col_unique_id(pdesc.col_unique_id()),
+ _col_type(static_cast<PrimitiveType>(pdesc.col_type())),
_slot_idx(pdesc.slot_idx()),
_field_idx(-1),
_is_materialized(pdesc.is_materialized()),
@@ -96,6 +99,7 @@ void SlotDescriptor::to_protobuf(PSlotDescriptor* pslot)
const {
pslot->set_is_materialized(_is_materialized);
pslot->set_col_unique_id(_col_unique_id);
pslot->set_is_key(_is_key);
+ pslot->set_col_type(_col_type);
}
vectorized::MutableColumnPtr SlotDescriptor::get_empty_mutable_column() const {
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 7bc0ed04b24..0d765854ae6 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -36,6 +36,7 @@
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/global_types.h"
#include "common/status.h"
+#include "runtime/define_primitive_type.h"
#include "runtime/types.h"
#include "vec/data_types/data_type.h"
@@ -110,6 +111,7 @@ public:
bool need_materialize() const { return _need_materialize; }
const std::string& col_default_value() const { return _col_default_value; }
+ PrimitiveType col_type() const { return _col_type; }
private:
friend class DescriptorTbl;
@@ -129,6 +131,7 @@ private:
const std::string _col_name_lower_case;
const int32_t _col_unique_id;
+ const PrimitiveType _col_type;
// the idx of the slot in the tuple descriptor (0-based).
// this is provided by the FE
diff --git a/be/src/runtime/primitive_type.cpp
b/be/src/runtime/primitive_type.cpp
index efbf2cf57a1..8636bf0b6bd 100644
--- a/be/src/runtime/primitive_type.cpp
+++ b/be/src/runtime/primitive_type.cpp
@@ -156,6 +156,9 @@ PrimitiveType thrift_to_type(TPrimitiveType::type ttype) {
case TPrimitiveType::AGG_STATE:
return TYPE_AGG_STATE;
+ case TPrimitiveType::VARIANT:
+ return TYPE_VARIANT;
+
default:
CHECK(false) << ", meet unknown type " << ttype;
return INVALID_TYPE;
@@ -258,6 +261,8 @@ TPrimitiveType::type to_thrift(PrimitiveType ptype) {
return TPrimitiveType::STRUCT;
case TYPE_LAMBDA_FUNCTION:
return TPrimitiveType::LAMBDA_FUNCTION;
+ case TYPE_AGG_STATE:
+ return TPrimitiveType::AGG_STATE;
default:
return TPrimitiveType::INVALID_TYPE;
@@ -364,6 +369,9 @@ std::string type_to_string(PrimitiveType t) {
case TYPE_LAMBDA_FUNCTION:
return "LAMBDA_FUNCTION TYPE";
+ case TYPE_VARIANT:
+ return "VARIANT";
+
default:
return "";
};
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
index f53f3106dec..66f03c1f21d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
@@ -287,12 +287,13 @@ public class SlotDescriptor {
public TSlotDescriptor toThrift() {
// Non-nullable slots will have 0 for the byte offset and -1 for the
bit mask
TSlotDescriptor tSlotDescriptor = new TSlotDescriptor(id.asInt(),
parent.getId().asInt(), type.toThrift(), -1,
- byteOffset, 0, getIsNullable() ? 0 : -1, ((column != null) ?
column.getName() : ""), slotIdx,
+ byteOffset, 0, getIsNullable() ? 0 : -1, ((column != null) ?
column.getNonShadowName() : ""), slotIdx,
isMaterialized);
tSlotDescriptor.setNeedMaterialize(needMaterialize);
if (column != null) {
- LOG.debug("column name:{}, column unique id:{}", column.getName(),
column.getUniqueId());
+ LOG.debug("column name:{}, column unique id:{}",
column.getNonShadowName(), column.getUniqueId());
tSlotDescriptor.setColUniqueId(column.getUniqueId());
+ tSlotDescriptor.setPrimitiveType(column.getDataType().toThrift());
tSlotDescriptor.setIsKey(column.isKey());
tSlotDescriptor.setColDefaultValue(column.getDefaultValue());
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
index 650e24d92ce..1724167afc5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
@@ -491,7 +491,7 @@ public class Column implements Writable,
GsonPostProcessable {
public TColumn toThrift() {
TColumn tColumn = new TColumn();
- tColumn.setColumnName(this.name);
+ tColumn.setColumnName(removeNamePrefix(this.name));
TColumnType tColumnType = new TColumnType();
tColumnType.setType(this.getDataType().toThrift());
diff --git a/gensrc/proto/descriptors.proto b/gensrc/proto/descriptors.proto
index 65a001e0ba0..270199cc020 100644
--- a/gensrc/proto/descriptors.proto
+++ b/gensrc/proto/descriptors.proto
@@ -36,6 +36,8 @@ message PSlotDescriptor {
optional bool is_materialized = 10;
optional int32 col_unique_id = 11;
optional bool is_key = 12;
+ optional bool is_auto_increment = 13;
+ optional int32 col_type = 14 [default = 0];
};
message PTupleDescriptor {
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index af2860d4233..95f23690d88 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -61,6 +61,7 @@ struct TSlotDescriptor {
// subcolumn path info list for semi structure column(variant)
15: optional list<string> column_paths
16: optional string col_default_value
+ 17: optional Types.TPrimitiveType primitive_type =
Types.TPrimitiveType.INVALID_TYPE
}
struct TTupleDescriptor {
diff --git a/regression-test/suites/schema_change/ddl/lineorder_create.sql
b/regression-test/suites/schema_change/ddl/lineorder_create.sql
new file mode 100644
index 00000000000..44226282198
--- /dev/null
+++ b/regression-test/suites/schema_change/ddl/lineorder_create.sql
@@ -0,0 +1,24 @@
+CREATE TABLE IF NOT EXISTS `lineorder` (
+ `lo_orderkey` bigint(20) NOT NULL COMMENT "",
+ `lo_linenumber` bigint(20) NOT NULL COMMENT "",
+ `lo_custkey` int(11) NOT NULL COMMENT "",
+ `lo_partkey` int(11) NOT NULL COMMENT "",
+ `lo_suppkey` int(11) NOT NULL COMMENT "",
+ `lo_orderdate` int(11) NOT NULL COMMENT "",
+ `lo_orderpriority` varchar(16) NOT NULL COMMENT "",
+ `lo_shippriority` int(11) NOT NULL COMMENT "",
+ `lo_quantity` bigint(20) NOT NULL COMMENT "",
+ `lo_extendedprice` bigint(20) NOT NULL COMMENT "",
+ `lo_ordtotalprice` bigint(20) NOT NULL COMMENT "",
+ `lo_discount` bigint(20) NOT NULL COMMENT "",
+ `lo_revenue` bigint(20) NOT NULL COMMENT "",
+ `lo_supplycost` bigint(20) NOT NULL COMMENT "",
+ `lo_tax` bigint(20) NOT NULL COMMENT "",
+ `lo_commitdate` bigint(20) NOT NULL COMMENT "",
+ `lo_shipmode` varchar(11) NOT NULL COMMENT ""
+)
+DUPLICATE KEY (`lo_orderkey`, `lo_linenumber`)
+DISTRIBUTED BY HASH(`lo_orderkey`) BUCKETS 1
+PROPERTIES (
+"replication_num" = "1"
+);
diff --git a/regression-test/suites/schema_change/ddl/lineorder_delete.sql
b/regression-test/suites/schema_change/ddl/lineorder_delete.sql
new file mode 100644
index 00000000000..2c1c2fa57d9
--- /dev/null
+++ b/regression-test/suites/schema_change/ddl/lineorder_delete.sql
@@ -0,0 +1 @@
+drop table IF EXISTS lineorder;
diff --git
a/regression-test/suites/schema_change/test_double_write_when_schema_change.groovy
b/regression-test/suites/schema_change/test_double_write_when_schema_change.groovy
new file mode 100644
index 00000000000..43a9bc2b349
--- /dev/null
+++
b/regression-test/suites/schema_change/test_double_write_when_schema_change.groovy
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Most of the cases are copied from
https://github.com/trinodb/trino/tree/master
+// /testing/trino-product-tests/src/main/resources/sql-tests/testcases
+// and modified by Doris.
+
+// Note: To filter out tables from sql files, use the following one-liner
comamnd
+// sed -nr 's/.*tables: (.*)$/\1/gp' /path/to/*.sql | sed -nr 's/,/\n/gp' |
sort | uniq
+suite("double_write_schema_change") {
+
+ // ssb_sf1_p1 is writted to test unique key table merge correctly.
+ // It creates unique key table and sets bucket num to 1 in order to make
sure that
+ // many rowsets will be created during loading and then the merge process
will be triggered.
+
+ def tableName = "lineorder"
+ def columns =
"""lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority,
+
lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount,
+
lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy"""
+
+ sql new File("""${context.file.parent}/ddl/${tableName}_delete.sql""").text
+ sql new File("""${context.file.parent}/ddl/${tableName}_create.sql""").text
+
+ streamLoad {
+ // a default db 'regression_test' is specified in
+ // ${DORIS_HOME}/conf/regression-conf.groovy
+ table tableName
+
+ // default label is UUID:
+ // set 'label' UUID.randomUUID().toString()
+
+ // default column_separator is specify in doris fe config, usually is
'\t'.
+ // this line change to ','
+ set 'column_separator', '|'
+ set 'compress_type', 'GZ'
+ set 'columns', columns
+
+
+ // relate to
${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
+ // also, you can stream load a http stream, e.g. http://xxx/some.csv
+ file """${getS3Url()}/regression/ssb/sf1/${tableName}.tbl.gz"""
+
+ time 10000 // limit inflight 10s
+
+ // stream load action will check result, include Success status, and
NumberTotalRows == NumberLoadedRows
+
+ // if declared a check callback, the default check condition will
ignore.
+ // So you must check all condition
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+ }
+ }
+
+ def getJobState = { indexName ->
+ def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE
IndexName='${indexName}' ORDER BY createtime DESC LIMIT 1 """
+ return jobStateResult[0][9]
+ }
+
+ def insert_sql = """ insert into ${tableName} values(100000000, 1, 1, 1,
1, 1, "1", 1, 1, 1, 1, 1, 1, 1, 1, 1, "1") """
+
+ sql """ ALTER TABLE ${tableName} modify COLUMN lo_custkey double"""
+ int max_try_time = 3000
+ while (max_try_time--){
+ String result = getJobState(tableName)
+ if (result == "FINISHED") {
+ sleep(3000)
+ break
+ } else {
+ if (result == "RUNNING") {
+ sql insert_sql
+ }
+ sleep(100)
+ if (max_try_time < 1){
+ assertEquals(1,2)
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]