VividByteWorker commented on code in PR #63110:
URL: https://github.com/apache/doris/pull/63110#discussion_r3225445395


##########
be/src/cloud/cloud_tablet.cpp:
##########
@@ -788,7 +788,7 @@ Result<std::unique_ptr<RowsetWriter>> 
CloudTablet::create_rowset_writer(
 // after writer, merge this transient rowset with original rowset
 Result<std::unique_ptr<RowsetWriter>> 
CloudTablet::create_transient_rowset_writer(
         const Rowset& rowset, std::shared_ptr<PartialUpdateInfo> 
partial_update_info,
-        int64_t txn_expiration) {
+        int64_t txn_expiration, bool build_row_binlog) {

Review Comment:
   build_row_binlog useless?



##########
be/src/exec/scan/parallel_scanner_builder.cpp:
##########
@@ -254,8 +254,18 @@ Status ParallelScannerBuilder::_load() {
 std::shared_ptr<OlapScanner> ParallelScannerBuilder::_build_scanner(
         BaseTabletSPtr tablet, int64_t version, const 
std::vector<OlapScanRange*>& key_ranges,
         TabletReadSource&& read_source) {
-    OlapScanner::Params params {_state,  _scanner_profile.get(), key_ranges, 
std::move(tablet),
-                                version, std::move(read_source), _limit,     
_is_preaggregation};
+    OlapScanner::Params params {

Review Comment:
   parallel scan should be banned for binlog reader? we should keep the 
sequence of binlog read



##########
be/src/cloud/cloud_tablets_channel.cpp:
##########
@@ -36,12 +37,15 @@ 
CloudTabletsChannel::CloudTabletsChannel(CloudStorageEngine& engine, const Table
 CloudTabletsChannel::~CloudTabletsChannel() = default;
 
 std::unique_ptr<BaseDeltaWriter> CloudTabletsChannel::create_delta_writer(
-        const WriteRequest& request) {
+        const std::shared_ptr<WriteRequest>& request) {

Review Comment:
   why change reference to shared_ptr?



##########
be/src/storage/rowset/segment_creator.cpp:
##########
@@ -69,7 +70,9 @@ Status SegmentFlusher::flush_single_block(const Block* block, 
int32_t segment_id
     }
     Block flush_block(*block);
     bool no_compression = flush_block.bytes() <= 
config::segment_compression_threshold_kb * 1024;
-    if (config::enable_vertical_segment_writer) {
+    bool use_vertical_segment_writer = config::enable_vertical_segment_writer 
&&
+                                       
!_context.write_binlog_opt().is_binlog_writer();

Review Comment:
   so for binlog, horizontal segment writer is used? what if the table is wide 
with 1000 columns?



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/RowBinlogTableWrapper.java:
##########
@@ -37,4 +37,18 @@ public RowBinlogTableWrapper(OlapTable originTable) {
     public long getBaseIndexId() {
         return rowBinlogMeta.getIndexId();
     }
+
+    @Override
+    public MaterializedIndex getPartitionIndex(Partition partition, long 
indexId) {

Review Comment:
   why not just call return originTable.getPartitionIndex(partition, indexId)?



##########
be/src/storage/segment/segment_iterator.cpp:
##########
@@ -2388,6 +2389,120 @@ void 
SegmentIterator::_replace_version_col_if_needed(const std::vector<ColumnId>
     VLOG_DEBUG << "replaced version column in segment iterator, 
version_col_idx:" << version_idx;
 }
 
+void SegmentIterator::_update_lsn_col_if_needed(const std::vector<ColumnId>& 
column_ids,
+                                                size_t num_rows) {
+    // | real version(64) | auto-inc row_id(64) |
+    if (_opts.version.first != _opts.version.second) {
+        return;
+    }
+
+    if (_opts.io_ctx.reader_type != ReaderType::READER_BINLOG &&
+        _opts.io_ctx.reader_type != ReaderType::READER_BINLOG_COMPACTION) {
+        return;
+    }
+
+    int32_t lsn_col_idx = _schema->lsn_col_idx();
+    if (lsn_col_idx < 0 || std::ranges::find(column_ids, lsn_col_idx) == 
column_ids.end()) {
+        return;
+    }
+
+    if (_is_pred_column[lsn_col_idx]) {
+        auto* lsn_column = assert_cast<PredicateColumnType<TYPE_LARGEINT>*>(
+                _current_return_columns[lsn_col_idx].get());
+        std::vector<Int128> binlog_lsns;
+        binlog_lsns.reserve(num_rows);
+        static constexpr Int128 kLow64Mask = (static_cast<Int128>(1) << 64) - 
1;
+        for (size_t j = 0; j < num_rows; j++) {
+            const Int128 row_id = lsn_column->get_data()[j];
+            
binlog_lsns.emplace_back((static_cast<Int128>(_opts.version.second) << 64) |
+                                     (row_id & kLow64Mask));
+        }
+        _current_return_columns[lsn_col_idx]->clear();
+        for (const auto& binlog_lsn : binlog_lsns) {
+            lsn_column->insert_data(reinterpret_cast<const 
char*>(&binlog_lsn), 0);
+        }
+        return;
+    }
+
+    auto* lsn_column = 
assert_cast<ColumnInt128*>(_current_return_columns[lsn_col_idx].get());
+    const auto* column_desc = _schema->column(lsn_col_idx);
+    auto column = Schema::get_data_type_ptr(*column_desc)->create_column();
+    DCHECK(column_desc->type() == FieldType::OLAP_FIELD_TYPE_LARGEINT);
+    auto* col_ptr = assert_cast<ColumnInt128*>(column.get());
+
+    static constexpr Int128 kLow64Mask = (static_cast<Int128>(1) << 64) - 1;
+    for (size_t j = 0; j < num_rows; j++) {
+        const Int128 row_id = lsn_column->get_element(j);
+        const Int128 binlog_lsn =
+                (static_cast<Int128>(_opts.version.second) << 64) | (row_id & 
kLow64Mask);
+        col_ptr->insert_value(binlog_lsn);
+    }
+    _current_return_columns[lsn_col_idx] = std::move(column);
+}
+
+void SegmentIterator::_update_tso_col_if_needed(const std::vector<ColumnId>& 
column_ids,
+                                                size_t num_rows) {
+    // use commit tso to replace timestamp col
+    if (_opts.version.first != _opts.version.second) {
+        return;
+    }
+
+    if (_opts.io_ctx.reader_type != ReaderType::READER_BINLOG &&
+        _opts.io_ctx.reader_type != ReaderType::READER_BINLOG_COMPACTION) {
+        return;
+    }
+
+    int32_t tso_col_idx = _schema->tso_col_idx();
+    if (tso_col_idx < 0 || std::ranges::find(column_ids, tso_col_idx) == 
column_ids.end()) {
+        return;
+    }
+
+    DCHECK_EQ(_opts.commit_tso.start_tso(), _opts.commit_tso.end_tso());
+    Int64 commit_tso = _opts.commit_tso.end_tso() == -1 ? 0 : 
_opts.commit_tso.end_tso();

Review Comment:
   commit_tso is composited by two parts: physical timestamp + logical counter.
   here I think only physical timestamp is required?
   commit_tso = commit_tso >> 18; ?



##########
conf/be.conf:
##########
@@ -31,11 +31,11 @@ 
JEMALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzz
 JEMALLOC_PROF_PRFIX="jemalloc_heap_profile_"
 
 # ports for admin, web, heartbeat service 
-be_port = 9060
-webserver_port = 8040
-heartbeat_service_port = 9050
-brpc_port = 8060
-arrow_flight_sql_port = 8050
+be_port = 9061

Review Comment:
   why?



##########
be/src/storage/task/engine_publish_version_task.cpp:
##########
@@ -520,18 +528,22 @@ void AsyncTabletPublishTask::handle() {
     std::lock_guard<std::mutex> wrlock(_tablet->get_rowset_update_lock());
     _stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us;
     std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
-    _engine.txn_manager()->get_txn_related_tablets(_transaction_id, 
_partition_id,
-                                                   &tablet_related_rs);
+    std::map<TabletInfo, std::vector<RowsetSharedPtr>> 
tablet_related_attach_rowsets;
+    _engine.txn_manager()->get_txn_related_tablets(
+            _transaction_id, _partition_id, &tablet_related_rs, 
&tablet_related_attach_rowsets);
     auto iter = tablet_related_rs.find(TabletInfo(_tablet->tablet_id(), 
_tablet->tablet_uid()));
     if (iter == tablet_related_rs.end()) {
         return;
     }
+    auto attach_rowsets_it = tablet_related_attach_rowsets.find(
+            TabletInfo(_tablet->tablet_id(), _tablet->tablet_uid()));
+    DCHECK(attach_rowsets_it != tablet_related_attach_rowsets.end());
     RowsetSharedPtr rowset = iter->second;
     Version version(_version, _version);
 
-    auto publish_status =
-            publish_version_and_add_rowset(_engine, _partition_id, _tablet, 
rowset, _transaction_id,
-                                           version, nullptr, _stats, 
_commit_tso);
+    auto publish_status = publish_version_and_add_rowset(_engine, 
_partition_id, _tablet, rowset,
+                                                         
attach_rowsets_it->second, _transaction_id,
+                                                         version, nullptr, 
_stats, _commit_tso);

Review Comment:
   _commit_tso/_transaction_id/_stats is member of TabletPublishTxnTask.
   I think we can directly use them in publish_version_and_add_rowset, no need 
to pass it through arguments.



##########
be/src/storage/tablet/tablet_manager.cpp:
##########
@@ -480,6 +480,9 @@ TabletSharedPtr 
TabletManager::_create_tablet_meta_and_dir_unlocked(
                 _gen_tablet_dir(data_dir->path(), tablet_meta->shard_id(), 
request.tablet_id);
         string schema_hash_dir = path_util::join_path_segments(
                 tablet_dir, std::to_string(request.tablet_schema.schema_hash));
+        bool has_row_binlog = tablet_meta->binlog_config().is_enable() &&
+                              tablet_meta->binlog_config().isRowBinlogFormat();

Review Comment:
   ```suggestion
                                 
tablet_meta->binlog_config().is_row_binlog_format();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to