zhannngchen commented on code in PR #17542:
URL: https://github.com/apache/doris/pull/17542#discussion_r1178914728


##########
be/src/olap/tablet.cpp:
##########
@@ -2509,16 +2606,71 @@ Status Tablet::_load_rowset_segments(const 
RowsetSharedPtr& rowset,
     return Status::OK();
 }
 
+void Tablet::sort_block(vectorized::Block& in_block, vectorized::Block& 
output_block) {

Review Comment:
   move the method to some Util class? There's too much methods in Tablet class



##########
be/src/olap/tablet.cpp:
##########
@@ -2582,19 +2741,48 @@ Status Tablet::calc_delete_bitmap(RowsetId rowset_id,
 
                     // sequence id smaller than the previous one, so delete 
current row
                     if (st.is<ALREADY_EXIST>()) {
-                        loc.rowset_id = rowset_id;
-                        loc.segment_id = seg->id();
-                        loc.row_id = row_id;
+                        delete_bitmap->add({rowset_id, seg->id(), 0}, row_id);
+                        continue;
+                    } else if (is_partial_update && rowset_writer != nullptr) {
+                        // In publish version, record rows to be deleted for 
concurrent update
+                        // For example, if version 5 and 6 update a row, but 
version 6 only see
+                        // version 4 when write, and when publish version, 
version 5's value will
+                        // be marked as deleted and it's update is losed.
+                        // So here we should read version 5's columns and 
build a new row, which is
+                        // consists of version 6's update columns and version 
5's origin columns
+                        // here we build 2 read plan for ori values and update 
values
+                        prepare_to_read(loc, pos, &read_plan_ori);
+                        prepare_to_read(RowLocation {rowset_id, seg->id(), 
row_id}, pos,
+                                        &read_plan_update);
+                        rsid_to_rowset[rowset_find->rowset_id()] = rowset_find;
+                        ++pos;
+                        // sort segment rows here
+                        if (pos >= segment_row_max) {
+                            
generate_new_block_for_partial_update(rowset_schema, read_plan_ori,
+                                                                  
read_plan_update, rsid_to_rowset,
+                                                                  &block);
+                            sort_block(block, ordered_block);
+                            int64_t size;
+                            
rowset_writer->flush_single_memtable(&ordered_block, &size);
+                            // clear all tmp data
+                            read_plan_ori.clear();
+                            read_plan_update.clear();
+                            pos = 0;
+                            block.clear_column_data();
+                            ordered_block.clear_column_data();
+                        }
+                        // delete bitmap will be calculate when memtable flush 
and
+                        // publish. The two stages may see different versions.
+                        // When there is sequence column, the currently 
imported data
+                        // of rowset may be marked for deletion at memtablet 
flush or
+                        // publish because the seq column is smaller than the 
previous
+                        // rowset.
+                        // just set 0 as a unified temporary version number, 
and update to
+                        // the real version number later.
+                        delete_bitmap->add({loc.rowset_id, loc.segment_id, 0}, 
loc.row_id);
+                        delete_bitmap->add({rowset_id, seg->id(), 0}, row_id);
+                        continue;
                     }
-
-                    // delete bitmap will be calculate when memtable flush and
-                    // publish. The two stages may see different versions.
-                    // When there is sequence column, the currently imported 
data
-                    // of rowset may be marked for deletion at memtablet flush 
or
-                    // publish because the seq column is smaller than the 
previous
-                    // rowset.
-                    // just set 0 as a unified temporary version number, and 
update to
-                    // the real version number later.
                     delete_bitmap->add({loc.rowset_id, loc.segment_id, 0}, 
loc.row_id);

Review Comment:
   This line is useless



##########
be/src/olap/tablet.cpp:
##########
@@ -2509,16 +2606,71 @@ Status Tablet::_load_rowset_segments(const 
RowsetSharedPtr& rowset,
     return Status::OK();
 }
 
+void Tablet::sort_block(vectorized::Block& in_block, vectorized::Block& 
output_block) {
+    vectorized::MutableBlock mutable_input_block =
+            vectorized::MutableBlock::build_mutable_block(&in_block);
+    vectorized::MutableBlock mutable_output_block =
+            vectorized::MutableBlock::build_mutable_block(&output_block);
+
+    std::vector<RowInBlock*> _row_in_blocks;
+    _row_in_blocks.reserve(in_block.rows());
+
+    std::unique_ptr<Schema> schema(new Schema(_schema));
+    std::shared_ptr<RowInBlockComparator> vec_row_comparator =
+            std::make_shared<RowInBlockComparator>(schema.get());
+    vec_row_comparator->set_block(&mutable_input_block);
+
+    std::vector<RowInBlock*> row_in_blocks;
+    DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
+    row_in_blocks.reserve(in_block.rows());
+    for (size_t i = 0; i < in_block.rows(); ++i) {
+        row_in_blocks.emplace_back(new RowInBlock {i});
+    }
+    std::sort(row_in_blocks.begin(), row_in_blocks.end(),
+              [&](const RowInBlock* l, const RowInBlock* r) -> bool {
+                  auto value = (*vec_row_comparator)(l, r);
+                  if (value == 0) {

Review Comment:
   value == 0 should not happen, which means there're some duplicate keys.
   add a CHECK here



##########
be/src/olap/tablet.cpp:
##########
@@ -2509,16 +2606,71 @@ Status Tablet::_load_rowset_segments(const 
RowsetSharedPtr& rowset,
     return Status::OK();
 }
 
+void Tablet::sort_block(vectorized::Block& in_block, vectorized::Block& 
output_block) {
+    vectorized::MutableBlock mutable_input_block =
+            vectorized::MutableBlock::build_mutable_block(&in_block);
+    vectorized::MutableBlock mutable_output_block =
+            vectorized::MutableBlock::build_mutable_block(&output_block);
+
+    std::vector<RowInBlock*> _row_in_blocks;
+    _row_in_blocks.reserve(in_block.rows());
+
+    std::unique_ptr<Schema> schema(new Schema(_schema));
+    std::shared_ptr<RowInBlockComparator> vec_row_comparator =
+            std::make_shared<RowInBlockComparator>(schema.get());
+    vec_row_comparator->set_block(&mutable_input_block);
+
+    std::vector<RowInBlock*> row_in_blocks;
+    DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
+    row_in_blocks.reserve(in_block.rows());
+    for (size_t i = 0; i < in_block.rows(); ++i) {
+        row_in_blocks.emplace_back(new RowInBlock {i});
+    }
+    std::sort(row_in_blocks.begin(), row_in_blocks.end(),
+              [&](const RowInBlock* l, const RowInBlock* r) -> bool {
+                  auto value = (*vec_row_comparator)(l, r);
+                  if (value == 0) {

Review Comment:
   maybe should use DCHECK, and return an error on such case, which will make 
the load fail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to