This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new a467e7a [refactor][fix] small fixes and code cleanups related to
schema change (#8328)
a467e7a is described below
commit a467e7a7906761549345a70419335895dec52a37
Author: dataroaring <[email protected]>
AuthorDate: Sat Mar 12 22:05:43 2022 +0800
[refactor][fix] small fixes and code cleanups related to schema change
(#8328)
For now, usage of RowBlockAllocator::allocate is a little complicated
due to its ambiguous return value. Some callers just test the return value
while some test the return value and non-null pointer. This patch let
it return success code only when it succeeds, then caller can just
test the return value.
---
be/src/olap/schema_change.cpp | 104 ++++++---------------
.../apache/doris/analysis/ColumnRenameClause.java | 2 +-
2 files changed, 32 insertions(+), 74 deletions(-)
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 0bea116..c0b2085 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -92,7 +92,7 @@ private:
};
bool _make_heap(const std::vector<RowBlock*>& row_block_arr);
- bool _pop_heap();
+ void _pop_heap();
TabletSharedPtr _tablet;
std::priority_queue<MergeElement> _heap;
@@ -693,8 +693,7 @@ bool RowBlockSorter::sort(RowBlock** row_block) {
}
if (_row_block_allocator->allocate(&_swap_row_block, row_num,
null_supported) !=
- OLAP_SUCCESS ||
- _swap_row_block == nullptr) {
+ OLAP_SUCCESS) {
LOG(WARNING) << "fail to allocate memory.";
return false;
}
@@ -769,7 +768,7 @@ OLAPStatus RowBlockAllocator::allocate(RowBlock**
row_block, size_t num_rows, bo
<< "You can increase the memory "
<< "by changing the
Config.memory_limitation_per_thread_for_schema_change";
*row_block = nullptr;
- return OLAP_SUCCESS;
+ return OLAP_ERR_INPUT_PARAMETER_ERROR;
}
// TODO(lijiao) : Why abandon the original m_row_block_buffer
@@ -832,16 +831,17 @@ bool RowBlockMerger::merge(const std::vector<RowBlock*>&
row_block_arr, RowsetWr
goto MERGE_ERR;
}
- _make_heap(row_block_arr);
+ if (!_make_heap(row_block_arr)) {
+ // There is error log in _make_heap, so no need to more log.
+ goto MERGE_ERR;
+ }
row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema());
while (_heap.size() > 0) {
init_row_with_others(&row_cursor, *(_heap.top().row_cursor),
mem_pool.get(),
agg_object_pool.get());
- if (!_pop_heap()) {
- goto MERGE_ERR;
- }
+ _pop_heap();
if (KeysType::DUP_KEYS == _tablet->keys_type()) {
if (rowset_writer->add_row(row_cursor) != OLAP_SUCCESS) {
@@ -856,9 +856,7 @@ bool RowBlockMerger::merge(const std::vector<RowBlock*>&
row_block_arr, RowsetWr
// we should fix this trick ASAP
agg_update_row(&row_cursor, *(_heap.top().row_cursor), nullptr);
++tmp_merged_rows;
- if (!_pop_heap()) {
- goto MERGE_ERR;
- }
+ _pop_heap();
}
agg_finalize_row(&row_cursor, mem_pool.get());
if (rowset_writer->add_row(row_cursor) != OLAP_SUCCESS) {
@@ -915,19 +913,19 @@ bool RowBlockMerger::_make_heap(const
std::vector<RowBlock*>& row_block_arr) {
return true;
}
-bool RowBlockMerger::_pop_heap() {
+void RowBlockMerger::_pop_heap() {
MergeElement element = _heap.top();
_heap.pop();
if (++element.row_block_index >=
element.row_block->row_block_info().row_num) {
SAFE_DELETE(element.row_cursor);
- return true;
+ return;
}
element.row_block->get_row(element.row_block_index, element.row_cursor);
_heap.push(element);
- return true;
+ return;
}
OLAPStatus LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader,
@@ -1023,19 +1021,9 @@ OLAPStatus
SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
}
}
- bool need_create_empty_version = false;
OLAPStatus res = OLAP_SUCCESS;
- if (!rowset_reader->rowset()->empty()) {
- int num_rows = rowset_reader->rowset()->num_rows();
- if (num_rows == 0) {
- // actually, the rowset is empty
- need_create_empty_version = true;
- }
- } else {
- need_create_empty_version = true;
- }
-
- if (need_create_empty_version) {
+ if (rowset_reader->rowset()->empty() ||
+ rowset_reader->rowset()->num_rows() == 0) {
res = rowset_writer->flush();
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "create empty version for schema change failed."
@@ -1104,14 +1092,10 @@ OLAPStatus
SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
<< ", new_index_rows=" << rowset_writer->num_rows();
res = OLAP_ERR_ALTER_STATUS_ERR;
}
- LOG(INFO) << "all row nums. source_rows=" <<
rowset_reader->rowset()->num_rows()
- << ", merged_rows=" << merged_rows() << ", filtered_rows="
<< filtered_rows()
- << ", new_index_rows=" << rowset_writer->num_rows();
- } else {
- LOG(INFO) << "all row nums. source_rows=" <<
rowset_reader->rowset()->num_rows()
- << ", merged_rows=" << merged_rows() << ", filtered_rows="
<< filtered_rows()
- << ", new_index_rows=" << rowset_writer->num_rows();
}
+ LOG(INFO) << "all row nums. source_rows=" <<
rowset_reader->rowset()->num_rows()
+ << ", merged_rows=" << merged_rows() << ", filtered_rows=" <<
filtered_rows()
+ << ", new_index_rows=" << rowset_writer->num_rows();
return res;
}
@@ -1147,14 +1131,10 @@ OLAPStatus
SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
}
}
- bool need_create_empty_version = false;
OLAPStatus res = OLAP_SUCCESS;
RowsetSharedPtr rowset = rowset_reader->rowset();
- if (rowset->empty() || rowset->num_rows() == 0) {
- need_create_empty_version = true;
- }
- if (need_create_empty_version) {
+ if (rowset->empty() || rowset->num_rows() == 0) {
res = new_rowset_writer->flush();
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "create empty version for schema change failed."
@@ -1194,10 +1174,7 @@ OLAPStatus
SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
reset_merged_rows();
reset_filtered_rows();
- bool use_beta_rowset = false;
- if (new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
- use_beta_rowset = true;
- }
+ bool use_beta_rowset = new_tablet->tablet_meta()->preferred_rowset_type()
== BETA_ROWSET;
SegmentsOverlapPB segments_overlap =
rowset->rowset_meta()->segments_overlap();
RowBlock* ref_row_block = nullptr;
@@ -1336,14 +1313,10 @@ OLAPStatus
SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
<< ", new_index_rows=" <<
new_rowset_writer->num_rows();
res = OLAP_ERR_ALTER_STATUS_ERR;
}
- LOG(INFO) << "all row nums. source_rows=" <<
rowset_reader->rowset()->num_rows()
- << ", merged_rows=" << merged_rows() << ", filtered_rows="
<< filtered_rows()
- << ", new_index_rows=" << new_rowset_writer->num_rows();
- } else {
- LOG(INFO) << "all row nums. source_rows=" <<
rowset_reader->rowset()->num_rows()
- << ", merged_rows=" << merged_rows() << ", filtered_rows="
<< filtered_rows()
- << ", new_index_rows=" << new_rowset_writer->num_rows();
}
+ LOG(INFO) << "all row nums. source_rows=" <<
rowset_reader->rowset()->num_rows()
+ << ", merged_rows=" << merged_rows() << ", filtered_rows=" <<
filtered_rows()
+ << ", new_index_rows=" << new_rowset_writer->num_rows();
return res;
}
@@ -2049,35 +2022,20 @@ OLAPStatus SchemaChangeHandler::_parse_request(
}
// Newly added column go here
- //if (new_column_schema.is_allow_null ||
new_column_schema.has_default_value) {
- {
- column_mapping->ref_column = -1;
-
- if (i < base_tablet->num_short_key_columns()) {
- *sc_directly = true;
- }
-
- if (OLAP_SUCCESS != (res = _init_column_mapping(column_mapping,
new_column,
-
new_column.default_value()))) {
- return res;
- }
+ column_mapping->ref_column = -1;
- VLOG_TRACE << "A column with default value will be added after
schema changing. "
- << "column=" << column_name
- << ", default_value=" << new_column.default_value();
- continue;
+ if (i < base_tablet->num_short_key_columns()) {
+ *sc_directly = true;
}
- // XXX: Only when DROP COLUMN, you will enter here when you encounter
a new Schema to an old Schema。
- column_mapping->ref_column = -1;
-
- if (OLAP_SUCCESS != (res = _init_column_mapping(column_mapping,
new_column, ""))) {
- return res;
+ if (OLAP_SUCCESS != (res = _init_column_mapping(column_mapping,
new_column,
+
new_column.default_value()))) {
+ return res;
}
- VLOG_NOTICE << "A new schema delta is converted while dropping column.
"
- << "Dropped column will be assigned as '0' for the older
schema. "
- << "column=" << column_name;
+ VLOG_TRACE << "A column with default value will be added after schema
changing. "
+ << "column=" << column_name
+ << ", default_value=" << new_column.default_value();
}
// Check if re-aggregation is needed.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnRenameClause.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnRenameClause.java
index e3d9d69..5924da5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnRenameClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnRenameClause.java
@@ -31,7 +31,7 @@ public class ColumnRenameClause extends AlterTableClause {
private String newColName;
public ColumnRenameClause(String colName, String newColName) {
- super(AlterOpType.SCHEMA_CHANGE);
+ super(AlterOpType.RENAME);
this.colName = colName;
this.newColName = newColName;
this.needTableStable = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]