TsukiokaKogane commented on code in PR #63850:
URL: https://github.com/apache/doris/pull/63850#discussion_r3341594245


##########
be/src/storage/iterator/block_reader.cpp:
##########
@@ -74,6 +111,426 @@ Status BlockReader::next_block_with_aggregation(Block* 
block, bool* eof) {
     return res;
 }
 
+Status BlockReader::_ensure_binlog_column_pos(const Block& src_block) {
+    if (_binlog_column_pos_inited) {
+        if (_binlog_op_pos >= 0 && _binlog_op_pos < src_block.columns() &&
+            src_block.get_by_position(_binlog_op_pos).name == 
kRowBinlogOpColName) {
+            return Status::OK();
+        }
+        _binlog_op_pos = -1;
+        _binlog_lsn_pos = -1;
+        _binlog_timestamp_pos = -1;
+        _binlog_column_pos_inited = false;
+    }
+
+    const size_t col_num = src_block.columns();
+    size_t col_names_total_length = 0;
+
+    for (size_t i = 0; i < col_num; ++i) {
+        const auto& name = src_block.get_by_position(i).name;
+        col_names_total_length += name.size();
+        if (name == kRowBinlogOpColName) {
+            _binlog_op_pos = static_cast<int>(i);
+        } else if (name == kRowBinlogLsnColName) {
+            _binlog_lsn_pos = static_cast<int>(i);
+        } else if (name == kRowBinlogTimestampColName) {
+            _binlog_timestamp_pos = static_cast<int>(i);
+        }
+    }
+
+    if (_binlog_op_pos < 0) {
+        std::string col_names;
+        col_names.reserve(col_names_total_length + (col_num > 0 ? (col_num - 
1) * 2 : 0));
+        if (col_num > 0) {
+            col_names.append(src_block.get_by_position(0).name);
+            for (size_t i = 1; i < col_num; ++i) {
+                col_names.append(", ");
+                col_names.append(src_block.get_by_position(i).name);
+            }
+        }
+        return Status::InternalError("row binlog op column not found, block 
columns: {}",
+                                     col_names);
+    }
+
+    _binlog_column_pos_inited = true;
+    return Status::OK();
+}
+
+int64_t BlockReader::_read_binlog_op(const IColumn& col, size_t row) const {
+    const IColumn* cur = &col;
+    if (const auto* nullable = check_and_get_column<ColumnNullable>(*cur)) {
+        if (nullable->is_null_at(row)) {
+            return ROW_BINLOG_UNKNOWN;
+        }
+        cur = &nullable->get_nested_column();
+    }
+
+    if (const auto* int64_col = check_and_get_column<ColumnInt64>(*cur)) {
+        return int64_col->get_element(row);
+    }
+
+    return ROW_BINLOG_UNKNOWN;
+}
+
+Status BlockReader::_write_binlog_op(IColumn& col, int64_t op) const {
+    IColumn* cur = &col;
+    ColumnNullable* nullable = nullptr;
+    if (auto* n = typeid_cast<ColumnNullable*>(cur)) {
+        nullable = n;
+        cur = &nullable->get_nested_column();
+    }
+
+    if (auto* int64_col = typeid_cast<ColumnInt64*>(cur)) {
+        int64_col->insert_value(op);
+    } else {
+        return Status::InternalError("invalid column type");
+    }
+
+    if (nullable != nullptr) {
+        nullable->get_null_map_data().push_back(0);
+    }
+    return Status::OK();
+}
+
+bool BlockReader::_is_binlog_meta_column(int idx) const {
+    return idx == _binlog_op_pos || idx == _binlog_lsn_pos || idx == 
_binlog_timestamp_pos;
+}
+
+int BlockReader::_resolve_source_column_index(const Block& src_block, int idx,
+                                              bool use_before) const {
+    if (!use_before || _is_binlog_meta_column(idx)) {
+        return idx;
+    }
+
+    return resolve_before_column_index(src_block, idx, _binlog_op_pos);
+}
+
+void BlockReader::_init_pending_row_columns(const Block& block) {
+    if (!_pending_row_columns.empty()) {
+        return;
+    }
+    _pending_row_columns = block.clone_empty_columns();
+}
+
+bool BlockReader::_emit_pending_row(Block* block, MutableColumns& 
target_columns,
+                                    size_t& output_row_count, bool* eof) {
+    if (!_has_pending_row) {
+        return false;
+    }
+
+    for (size_t i = 0; i < _pending_row_columns.size(); ++i) {
+        target_columns[i]->insert_from(*_pending_row_columns[i], 0);
+        _pending_row_columns[i]->clear();
+    }
+    _has_pending_row = false;
+    output_row_count++;
+
+    if (_eof) {
+        block->set_columns(std::move(target_columns));
+        *eof = false;
+        return true;
+    }
+
+    return false;
+}
+
+Status BlockReader::_append_change_row(MutableColumns& target_columns, const 
Block& src_block,
+                                       size_t row_pos, int64_t output_op, bool 
use_before) {
+    RETURN_IF_ERROR(_ensure_binlog_column_pos(src_block));
+    for (auto idx : _normal_columns_idx) {
+        int target_col_idx = _return_columns_loc[idx];
+        if (target_col_idx < 0) {
+            continue;
+        }
+        if (idx == _binlog_op_pos) {
+            RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx], 
output_op));
+            continue;
+        }
+        int source_idx = _resolve_source_column_index(src_block, idx, 
use_before);
+        RETURN_IF_ERROR(insert_cell_with_wrapper_adaptation(
+                *target_columns[target_col_idx], 
*src_block.get_by_position(source_idx).column,
+                row_pos));
+    }
+    return Status::OK();
+}
+
+Status BlockReader::_min_delta_next_block(Block* block, bool* eof) {
+    if (UNLIKELY(_eof && !_has_pending_row)) {
+        *eof = true;
+        return Status::OK();
+    }
+
+    if (_stored_data_columns.empty()) {
+        _stored_data_columns = _next_row.block->clone_empty_columns();
+    }
+
+    auto target_columns_guard = block->mutate_columns_scoped();
+    auto& target_columns = target_columns_guard.mutable_columns();
+    size_t output_row_count = 0;
+
+    _init_pending_row_columns(*block);
+    if (_emit_pending_row(block, target_columns, output_row_count, eof)) {
+        return Status::OK();
+    }
+
+    while (output_row_count < _reader_context.batch_size && !_eof) {
+        if (_stored_data_columns[0]->empty()) {
+            for (size_t i = 0; i < _stored_data_columns.size(); ++i) {
+                
_stored_data_columns[i]->insert_from(*_next_row.block->get_by_position(i).column,
+                                                     _next_row.row_pos);
+            }
+        }
+
+        IteratorRowRef last_row_ref = _next_row;
+
+        auto res = _vcollect_iter.next(&_next_row);
+        if (UNLIKELY(res.is<END_OF_FILE>())) {
+            _eof = true;
+            *eof = true;
+        } else if (UNLIKELY(!res.ok())) {
+            return res;
+        }
+
+        if (!_eof && _min_delta_next_row_has_same_key()) {
+            continue;
+        }
+
+        if (UNLIKELY(last_row_ref.block == nullptr)) {
+            return Status::InternalError("invalid row reference in min-delta 
stream reader");
+        }
+        RETURN_IF_ERROR(_ensure_binlog_column_pos(*last_row_ref.block));
+        auto first_op = _read_binlog_op(*_stored_data_columns[_binlog_op_pos], 
0);
+
+        auto& last_op_col = 
last_row_ref.block->get_by_position(_binlog_op_pos).column;
+        auto last_op = _read_binlog_op(*last_op_col, last_row_ref.row_pos);
+
+        auto result = AggregateFunctionMinDelta::calculate_result(first_op, 
last_op);
+        switch (result) {
+        case AggregateFunctionMinDelta::ResultType::SKIP:
+            break;
+        case AggregateFunctionMinDelta::ResultType::INSERT:
+            for (auto idx : _normal_columns_idx) {
+                int target_col_idx = _return_columns_loc[idx];
+                if (idx == _binlog_op_pos) {
+                    
RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
+                                                     STREAM_CHANGE_INSERT));
+                } else {
+                    target_columns[target_col_idx]->insert_from(
+                            *last_row_ref.block->get_by_position(idx).column, 
last_row_ref.row_pos);
+                }
+            }
+            output_row_count++;
+            break;
+        case AggregateFunctionMinDelta::ResultType::DELETE:
+            for (auto idx : _normal_columns_idx) {
+                int target_col_idx = _return_columns_loc[idx];
+                if (idx == _binlog_op_pos) {
+                    
RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
+                                                     STREAM_CHANGE_DELETE));
+                } else {
+                    target_columns[target_col_idx]->insert_from(
+                            *last_row_ref.block->get_by_position(idx).column, 
last_row_ref.row_pos);
+                }
+            }
+            output_row_count++;
+            break;
+        case AggregateFunctionMinDelta::ResultType::UPDATE_BEFORE_AFTER:
+            for (auto idx : _normal_columns_idx) {
+                int target_col_idx = _return_columns_loc[idx];
+                if (idx == _binlog_op_pos) {
+                    
RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
+                                                     
STREAM_CHANGE_UPDATE_BEFORE));
+                } else if (idx == _binlog_lsn_pos) {
+                    target_columns[target_col_idx]->insert_from(
+                            *last_row_ref.block->get_by_position(idx).column, 
last_row_ref.row_pos);
+                } else {
+                    int source_idx = 
_resolve_source_column_index(*last_row_ref.block, idx, true);
+                    
target_columns[target_col_idx]->insert_from(*_stored_data_columns[source_idx],
+                                                                0);
+                }
+            }
+            output_row_count++;
+
+            if (output_row_count >= _reader_context.batch_size) {
+                for (auto& col : _pending_row_columns) {
+                    col->clear();
+                }
+                for (auto idx : _normal_columns_idx) {
+                    int target_col_idx = _return_columns_loc[idx];
+                    if (idx == _binlog_op_pos) {
+                        
RETURN_IF_ERROR(_write_binlog_op(*_pending_row_columns[target_col_idx],
+                                                         
STREAM_CHANGE_UPDATE_AFTER));
+                    } else {
+                        _pending_row_columns[target_col_idx]->insert_from(
+                                
*last_row_ref.block->get_by_position(idx).column,
+                                last_row_ref.row_pos);
+                    }
+                }
+                _has_pending_row = true;
+            } else {
+                for (auto idx : _normal_columns_idx) {
+                    int target_col_idx = _return_columns_loc[idx];
+                    if (idx == _binlog_op_pos) {
+                        
RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
+                                                         
STREAM_CHANGE_UPDATE_AFTER));
+                    } else {
+                        target_columns[target_col_idx]->insert_from(
+                                
*last_row_ref.block->get_by_position(idx).column,
+                                last_row_ref.row_pos);
+                    }
+                }
+                output_row_count++;
+            }
+            break;
+        }
+
+        for (auto& col : _stored_data_columns) {
+            col->clear();
+        }
+    }
+
+    block->set_columns(std::move(target_columns));
+    *eof = _eof && !_has_pending_row;
+    return Status::OK();
+}
+
+Status BlockReader::_detail_change_next_block(Block* block, bool* eof) {
+    auto output_template_block = block->clone_empty();
+    auto target_columns_guard = block->mutate_columns_scoped();
+    auto& target_columns = target_columns_guard.mutable_columns();
+    size_t output_row_count = 0;
+    _init_pending_row_columns(*block);
+    if (_emit_pending_row(block, target_columns, output_row_count, eof)) {
+        return Status::OK();
+    }
+
+    while (output_row_count < _reader_context.batch_size) {
+        if (_vcollect_iter.is_merge()) {
+            if (_eof) {
+                break;
+            }
+            if (UNLIKELY(_next_row.block == nullptr)) {
+                return Status::InternalError("invalid row reference in detail 
stream reader");
+            }
+            const Block& source_block = *_next_row.block;
+            const size_t row = _next_row.row_pos;
+            RETURN_IF_ERROR(_ensure_binlog_column_pos(source_block));
+            int64_t op = 
_read_binlog_op(*source_block.get_by_position(_binlog_op_pos).column, row);
+            if (op == ROW_BINLOG_UPDATE) {
+                RETURN_IF_ERROR(_append_change_row(target_columns, 
source_block, row,
+                                                   
STREAM_CHANGE_UPDATE_BEFORE, true));
+                output_row_count++;
+                if (output_row_count >= _reader_context.batch_size) {
+                    for (auto& col : _pending_row_columns) {
+                        col->clear();
+                    }
+                    RETURN_IF_ERROR(_append_change_row(_pending_row_columns, 
source_block, row,
+                                                       
STREAM_CHANGE_UPDATE_AFTER, false));
+                    _has_pending_row = true;
+                } else {
+                    RETURN_IF_ERROR(_append_change_row(target_columns, 
source_block, row,
+                                                       
STREAM_CHANGE_UPDATE_AFTER, false));
+                    output_row_count++;
+                }
+            } else if (op == ROW_BINLOG_APPEND) {
+                RETURN_IF_ERROR(_append_change_row(target_columns, 
source_block, row,
+                                                   STREAM_CHANGE_INSERT, 
false));
+                output_row_count++;
+            } else if (op == ROW_BINLOG_DELETE) {
+                RETURN_IF_ERROR(_append_change_row(target_columns, 
source_block, row,
+                                                   STREAM_CHANGE_DELETE, 
false));
+                output_row_count++;
+            }
+
+            auto res = _vcollect_iter.next(&_next_row);
+            if (UNLIKELY(res.is<END_OF_FILE>())) {
+                _eof = true;
+                *eof = true;
+            } else if (UNLIKELY(!res.ok())) {
+                return res;
+            }
+            continue;
+        }
+
+        DCHECK(_next_row.block != nullptr);
+        auto source_template_block = _next_row.block->clone_empty();
+        Block source_block_storage;
+        source_block_storage = source_template_block.clone_empty();
+        Block* read_block = &source_block_storage;
+        Status res = _vcollect_iter.next(read_block);
+        if (UNLIKELY(!res.ok() && !res.is<END_OF_FILE>())) {
+            return res;
+        }
+        *eof = res.is<END_OF_FILE>();
+        _eof = *eof;
+        const Block& source_block = *read_block;
+        if (source_block.rows() == 0) {
+            break;
+        }
+        RETURN_IF_ERROR(_ensure_binlog_column_pos(source_block));
+        auto result_columns = output_template_block.clone_empty_columns();
+
+        for (size_t row = 0;

Review Comment:
   will fix it by remove redundant code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to