This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new d3e1baddeb [feature](branch-2.0) add failure tolerance for strict mode 
partial update stream load (#22100)
d3e1baddeb is described below

commit d3e1baddebf9be68367af794ab8589945a31bfef
Author: bobhan1 <[email protected]>
AuthorDate: Mon Jul 24 19:35:12 2023 +0800

    [feature](branch-2.0) add failure tolerance for strict mode partial update 
stream load (#22100)
---
 be/src/olap/delta_writer.cpp                       |  4 ++++
 be/src/olap/delta_writer.h                         |  2 ++
 be/src/olap/rowset/beta_rowset_writer.cpp          |  2 ++
 be/src/olap/rowset/beta_rowset_writer.h            |  5 ++++
 be/src/olap/rowset/rowset_writer.h                 |  2 ++
 be/src/olap/rowset/segment_v2/segment_writer.cpp   | 11 ++++++---
 be/src/olap/rowset/segment_v2/segment_writer.h     |  3 +++
 be/src/runtime/runtime_state.cpp                   |  1 +
 be/src/runtime/runtime_state.h                     |  9 +++++++
 be/src/runtime/tablets_channel.cpp                 |  1 +
 be/src/vec/sink/vtablet_sink.cpp                   | 12 ++++++++--
 gensrc/proto/internal_service.proto                |  1 +
 .../test_partial_update_strict_mode.out            | 13 ++++++++++
 ...oovy => test_partial_update_strict_mode.groovy} | 28 +++++++++++++++++-----
 .../test_partial_update_upsert.groovy              |  2 +-
 15 files changed, 84 insertions(+), 12 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index da2cc7cda7..c5c0797657 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -725,4 +725,8 @@ void DeltaWriter::finish_slave_tablet_pull_rowset(int64_t 
node_id, bool is_succe
     _unfinished_slave_node.erase(node_id);
 }
 
+int64_t DeltaWriter::num_rows_filtered() const {
+    return _rowset_writer == nullptr ? 0 : _rowset_writer->num_rows_filtered();
+}
+
 } // namespace doris
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 52b407876f..daf091bf8e 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -129,6 +129,8 @@ public:
 
     int64_t total_received_rows() const { return _total_received_rows; }
 
+    int64_t num_rows_filtered() const;
+
 private:
     DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, 
RuntimeProfile* profile,
                 const UniqueId& load_id);
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 03de7cf84d..a12e81d625 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -70,6 +70,7 @@ BetaRowsetWriter::BetaRowsetWriter()
           _total_data_size(0),
           _total_index_size(0),
           _raw_num_rows_written(0),
+          _num_rows_filtered(0),
           _segcompaction_worker(this),
           _is_doing_segcompaction(false) {
     _segcompaction_status.store(OK);
@@ -829,6 +830,7 @@ Status 
BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::Segme
     VLOG_DEBUG << "_segid_statistics_map add new record. segid:" << segid << " 
row_num:" << row_num
                << " data_size:" << segment_size << " index_size:" << 
index_size;
 
+    _num_rows_filtered += (*writer)->num_rows_filtered();
     writer->reset();
     if (flush_size) {
         *flush_size = segment_size + index_size;
diff --git a/be/src/olap/rowset/beta_rowset_writer.h 
b/be/src/olap/rowset/beta_rowset_writer.h
index b646f2a681..c7554cee72 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -36,6 +36,7 @@
 
 #include "common/status.h"
 #include "io/fs/file_reader_writer_fwd.h"
+#include "olap/delta_writer.h"
 #include "olap/olap_common.h"
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_meta.h"
@@ -96,6 +97,8 @@ public:
 
     int64_t num_rows() const override { return _raw_num_rows_written; }
 
+    int64_t num_rows_filtered() const override { return _num_rows_filtered; }
+
     RowsetId rowset_id() override { return _context.rowset_id; }
 
     RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; }
@@ -201,6 +204,8 @@ protected:
         KeyBoundsPB key_bounds;
     };
     std::map<uint32_t, Statistics> _segid_statistics_map;
+    std::atomic<int64_t> _num_rows_filtered;
+
     std::mutex _segid_statistics_map_mutex;
 
     bool _is_pending = false;
diff --git a/be/src/olap/rowset/rowset_writer.h 
b/be/src/olap/rowset/rowset_writer.h
index a614a48470..60a6fdc570 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -101,6 +101,8 @@ public:
 
     virtual int64_t num_rows() const = 0;
 
+    virtual int64_t num_rows_filtered() const = 0;
+
     virtual RowsetId rowset_id() = 0;
 
     virtual RowsetTypePB type() const = 0;
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 16be990554..226e1062db 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -46,6 +46,7 @@
 #include "olap/segment_loader.h"
 #include "olap/short_key_index.h"
 #include "olap/tablet_schema.h"
+#include "olap/utils.h"
 #include "runtime/memory/mem_tracker.h"
 #include "service/point_query_executor.h"
 #include "util/coding.h"
@@ -376,6 +377,8 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
     }
     std::vector<std::unique_ptr<SegmentCacheHandle>> 
segment_caches(specified_rowsets.size());
     // locate rows in base data
+
+    int64_t num_rows_filtered = 0;
     {
         for (size_t pos = row_pos; pos < num_rows; pos++) {
             std::string key = _full_encode_keys(key_columns, pos);
@@ -389,9 +392,10 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
                                               _mow_context->max_version, 
segment_caches, &rowset);
             if (st.is<NOT_FOUND>()) {
                 if (_tablet_schema->is_strict_mode()) {
-                    return Status::InternalError(
-                            "partial update in strict mode only support 
updating rows with an "
-                            "existing key!");
+                    ++num_rows_filtered;
+                    // delete the invalid newly inserted row
+                    
_mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id, _segment_id, 0},
+                                                     pos);
                 }
 
                 if (!_tablet_schema->can_insert_new_rows_in_partial_update()) {
@@ -440,6 +444,7 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
                                                      num_rows));
     }
 
+    _num_rows_filtered += num_rows_filtered;
     _num_rows_written += num_rows;
     _olap_data_convertor->clear_source_content();
     return Status::OK();
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h 
b/be/src/olap/rowset/segment_v2/segment_writer.h
index 742fa63a4f..0b17ed4faa 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/segment_writer.h
@@ -109,6 +109,7 @@ public:
 
     size_t get_inverted_index_file_size() const { return 
_inverted_index_file_size; }
     uint32_t num_rows_written() const { return _num_rows_written; }
+    int64_t num_rows_filtered() const { return _num_rows_filtered; }
     uint32_t row_count() const { return _row_count; }
 
     Status finalize(uint64_t* segment_file_size, uint64_t* index_size);
@@ -196,6 +197,8 @@ private:
     bool _has_key = true;
     // _num_rows_written means row count already written in this current 
column group
     uint32_t _num_rows_written = 0;
+    // number of rows filtered in strict mode partial update
+    int64_t _num_rows_filtered = 0;
     // _row_count means total row count of this segment
     // In vertical compaction row count is recorded when key columns group 
finish
     //  and _num_rows_written will be updated in value column group
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 28de6c752e..0a6ebca77d 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -115,6 +115,7 @@ RuntimeState::RuntimeState(const TPipelineInstanceParams& 
pipeline_params,
           _num_rows_load_total(0),
           _num_rows_load_filtered(0),
           _num_rows_load_unselected(0),
+          _num_rows_filtered_in_strict_mode_partial_update(0),
           _num_print_error_rows(0),
           _num_bytes_load_total(0),
           _num_finished_scan_range(0),
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 89543e492b..6ef4ee1081 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -241,6 +241,10 @@ public:
 
     int64_t num_rows_load_unselected() { return 
_num_rows_load_unselected.load(); }
 
+    int64_t num_rows_filtered_in_strict_mode_partial_update() {
+        return _num_rows_filtered_in_strict_mode_partial_update;
+    }
+
     int64_t num_rows_load_success() {
         return num_rows_load_total() - num_rows_load_filtered() - 
num_rows_load_unselected();
     }
@@ -265,6 +269,10 @@ public:
         _num_rows_load_unselected.fetch_add(num_rows);
     }
 
+    void update_num_rows_filtered_in_strict_mode_partial_update(int64_t 
num_rows) {
+        _num_rows_filtered_in_strict_mode_partial_update += num_rows;
+    }
+
     void set_per_fragment_instance_idx(int idx) { _per_fragment_instance_idx = 
idx; }
 
     int per_fragment_instance_idx() const { return _per_fragment_instance_idx; 
}
@@ -492,6 +500,7 @@ private:
     std::atomic<int64_t> _num_rows_load_total;      // total rows read from 
source
     std::atomic<int64_t> _num_rows_load_filtered;   // unqualified rows
     std::atomic<int64_t> _num_rows_load_unselected; // rows filtered by 
predicates
+    std::atomic<int64_t> _num_rows_filtered_in_strict_mode_partial_update;
     std::atomic<int64_t> _num_print_error_rows;
 
     std::atomic<int64_t> _num_bytes_load_total; // total bytes read from source
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index ad8b724ebc..3862fd533e 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -240,6 +240,7 @@ void TabletsChannel::_close_wait(DeltaWriter* writer,
         tablet_info->set_tablet_id(writer->tablet_id());
         tablet_info->set_schema_hash(writer->schema_hash());
         tablet_info->set_received_rows(writer->total_received_rows());
+        tablet_info->set_num_rows_filtered(writer->num_rows_filtered());
     } else {
         PTabletError* tablet_error = tablet_errors->Add();
         tablet_error->set_tablet_id(writer->tablet_id());
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 47f34ab7d0..37d918b18b 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -424,6 +424,10 @@ Status VNodeChannel::open_wait() {
                         _tablets_received_rows.emplace_back(tablet.tablet_id(),
                                                             
tablet.received_rows());
                     }
+                    if (tablet.has_num_rows_filtered()) {
+                        
_state->update_num_rows_filtered_in_strict_mode_partial_update(
+                                tablet.num_rows_filtered());
+                    }
                     VLOG_CRITICAL << "master replica commit info: tabletId=" 
<< tablet.tablet_id()
                                   << ", backendId=" << _node_id
                                   << ", master node id: " << this->node_id()
@@ -1507,7 +1511,9 @@ Status VOlapTableSink::close(RuntimeState* state, Status 
exec_status) {
 
             COUNTER_SET(_input_rows_counter, _number_input_rows);
             COUNTER_SET(_output_rows_counter, _number_output_rows);
-            COUNTER_SET(_filtered_rows_counter, _number_filtered_rows);
+            COUNTER_SET(_filtered_rows_counter,
+                        _number_filtered_rows +
+                                
state->num_rows_filtered_in_strict_mode_partial_update());
             COUNTER_SET(_send_data_timer, _send_data_ns);
             COUNTER_SET(_row_distribution_timer, 
(int64_t)_row_distribution_watch.elapsed_time());
             COUNTER_SET(_filter_timer, _filter_ns);
@@ -1527,7 +1533,9 @@ Status VOlapTableSink::close(RuntimeState* state, Status 
exec_status) {
             int64_t num_rows_load_total = _number_input_rows + 
state->num_rows_load_filtered() +
                                           state->num_rows_load_unselected();
             state->set_num_rows_load_total(num_rows_load_total);
-            state->update_num_rows_load_filtered(_number_filtered_rows);
+            state->update_num_rows_load_filtered(
+                    _number_filtered_rows +
+                    state->num_rows_filtered_in_strict_mode_partial_update());
             
state->update_num_rows_load_unselected(_number_immutable_partition_filtered_rows);
 
             // print log of add batch time of all node, for tracing load 
performance easily
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 148efec1a2..08a3240e00 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -69,6 +69,7 @@ message PTabletInfo {
     repeated string invalid_dict_cols = 3; 
     // total rows num received by DeltaWriter
     optional int64 received_rows = 4;
+    optional int64 num_rows_filtered = 5 [default = 0];
 }
 
 // open a tablet writer
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.out
 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.out
new file mode 100644
index 0000000000..ab2a296079
--- /dev/null
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.out
@@ -0,0 +1,13 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+1      kevin   18      shenzhen        400     2023-07-01T12:00
+
+-- !sql --
+1      kevin   18      shenzhen        500     2023-07-03T12:00:01
+
+-- !sql --
+1      kevin   18      shenzhen        400     2023-07-01T12:00
+
+-- !sql --
+1      kevin   18      shenzhen        400     2023-07-01T12:00
+
diff --git 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy
 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy
similarity index 78%
copy from 
regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy
copy to 
regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy
index 4da4159790..df17d1cd5d 100644
--- 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy
+++ 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy
@@ -16,9 +16,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-suite("test_partial_update_upsert", "p0") {
+suite("test_partial_update_strict_mode", "p0") {
 
-    def tableName = "test_partial_update_upsert1"
+    def tableName = "test_partial_update_strict_mode"
     sql """ DROP TABLE IF EXISTS ${tableName} """
     sql """
             CREATE TABLE ${tableName} ( 
@@ -48,17 +48,27 @@ suite("test_partial_update_upsert", "p0") {
         set 'format', 'csv'
         set 'partial_columns', 'true'
         set 'columns', 'id,balance,last_access_time'
-        set 'strict_mode', 'false'
+        set 'strict_mode', 'true'
+        set 'max_filter_ratio', '1'
 
         file 'upsert.csv'
         time 10000 // limit inflight 10s
+
+        check {result, exception, startTime, endTime ->
+            assertTrue(exception == null)
+            def json = parseJson(result)
+            assertEquals("Success", json.Status)
+            assertEquals(3, json.NumberTotalRows)
+            assertEquals(1, json.NumberLoadedRows)
+            assertEquals(2, json.NumberFilteredRows)
+        }
     }
     sql "sync"
     qt_sql """select * from ${tableName} order by id;"""
     sql """ DROP TABLE IF EXISTS ${tableName} """
 
 
-    def tableName2 = "test_partial_update_upsert2"
+    def tableName2 = "test_partial_update_strict_mode2"
     sql """ DROP TABLE IF EXISTS ${tableName2} """
     sql """
             CREATE TABLE ${tableName2} ( 
@@ -89,6 +99,7 @@ suite("test_partial_update_upsert", "p0") {
         set 'partial_columns', 'true'
         set 'columns', 'id,balance,last_access_time'
         set 'strict_mode', 'true'
+        set 'max_filter_ratio', '0.5'
 
         file 'upsert.csv'
         time 10000 // limit inflight 10s
@@ -96,8 +107,13 @@ suite("test_partial_update_upsert", "p0") {
         check {result, exception, startTime, endTime ->
             assertTrue(exception == null)
             def json = parseJson(result)
-            assertEquals("fail", json.Status.toLowerCase())
+            assertEquals("Fail", json.Status)
+            assertEquals("[INTERNAL_ERROR]too many filtered rows", 
json.Message)
+            assertEquals(3, json.NumberTotalRows)
+            assertEquals(1, json.NumberLoadedRows)
+            assertEquals(2, json.NumberFilteredRows)
         }
     }
-    sql """ DROP TABLE IF EXISTS ${tableName} """
+    qt_sql """select * from ${tableName2} order by id;"""
+    sql """ DROP TABLE IF EXISTS ${tableName2} """
 }
diff --git 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy
 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy
index 4da4159790..34d3c82d72 100644
--- 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy
+++ 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy
@@ -99,5 +99,5 @@ suite("test_partial_update_upsert", "p0") {
             assertEquals("fail", json.Status.toLowerCase())
         }
     }
-    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """ DROP TABLE IF EXISTS ${tableName2} """
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to