VividByteWorker commented on code in PR #63110:
URL: https://github.com/apache/doris/pull/63110#discussion_r3225445395
##########
be/src/cloud/cloud_tablet.cpp:
##########
@@ -788,7 +788,7 @@ Result<std::unique_ptr<RowsetWriter>>
CloudTablet::create_rowset_writer(
// after writer, merge this transient rowset with original rowset
Result<std::unique_ptr<RowsetWriter>>
CloudTablet::create_transient_rowset_writer(
const Rowset& rowset, std::shared_ptr<PartialUpdateInfo>
partial_update_info,
- int64_t txn_expiration) {
+ int64_t txn_expiration, bool build_row_binlog) {
Review Comment:
build_row_binlog useless?
##########
be/src/exec/scan/parallel_scanner_builder.cpp:
##########
@@ -254,8 +254,18 @@ Status ParallelScannerBuilder::_load() {
std::shared_ptr<OlapScanner> ParallelScannerBuilder::_build_scanner(
BaseTabletSPtr tablet, int64_t version, const
std::vector<OlapScanRange*>& key_ranges,
TabletReadSource&& read_source) {
- OlapScanner::Params params {_state, _scanner_profile.get(), key_ranges,
std::move(tablet),
- version, std::move(read_source), _limit,
_is_preaggregation};
+ OlapScanner::Params params {
Review Comment:
parallel scan should be banned for binlog reader? we should keep the
sequence of binlog read
##########
be/src/cloud/cloud_tablets_channel.cpp:
##########
@@ -36,12 +37,15 @@
CloudTabletsChannel::CloudTabletsChannel(CloudStorageEngine& engine, const Table
CloudTabletsChannel::~CloudTabletsChannel() = default;
std::unique_ptr<BaseDeltaWriter> CloudTabletsChannel::create_delta_writer(
- const WriteRequest& request) {
+ const std::shared_ptr<WriteRequest>& request) {
Review Comment:
why change reference to shared_ptr?
##########
be/src/storage/rowset/segment_creator.cpp:
##########
@@ -69,7 +70,9 @@ Status SegmentFlusher::flush_single_block(const Block* block,
int32_t segment_id
}
Block flush_block(*block);
bool no_compression = flush_block.bytes() <=
config::segment_compression_threshold_kb * 1024;
- if (config::enable_vertical_segment_writer) {
+ bool use_vertical_segment_writer = config::enable_vertical_segment_writer
&&
+
!_context.write_binlog_opt().is_binlog_writer();
Review Comment:
so for binlog, horizontal segment writer is used? what if the table is wide
with 1000 columns?
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/RowBinlogTableWrapper.java:
##########
@@ -37,4 +37,18 @@ public RowBinlogTableWrapper(OlapTable originTable) {
public long getBaseIndexId() {
return rowBinlogMeta.getIndexId();
}
+
+ @Override
+ public MaterializedIndex getPartitionIndex(Partition partition, long
indexId) {
Review Comment:
why not just call return originTable.getPartitionIndex(partition, indexId)?
##########
be/src/storage/segment/segment_iterator.cpp:
##########
@@ -2388,6 +2389,120 @@ void
SegmentIterator::_replace_version_col_if_needed(const std::vector<ColumnId>
VLOG_DEBUG << "replaced version column in segment iterator,
version_col_idx:" << version_idx;
}
+void SegmentIterator::_update_lsn_col_if_needed(const std::vector<ColumnId>&
column_ids,
+ size_t num_rows) {
+ // | real version(64) | auto-inc row_id(64) |
+ if (_opts.version.first != _opts.version.second) {
+ return;
+ }
+
+ if (_opts.io_ctx.reader_type != ReaderType::READER_BINLOG &&
+ _opts.io_ctx.reader_type != ReaderType::READER_BINLOG_COMPACTION) {
+ return;
+ }
+
+ int32_t lsn_col_idx = _schema->lsn_col_idx();
+ if (lsn_col_idx < 0 || std::ranges::find(column_ids, lsn_col_idx) ==
column_ids.end()) {
+ return;
+ }
+
+ if (_is_pred_column[lsn_col_idx]) {
+ auto* lsn_column = assert_cast<PredicateColumnType<TYPE_LARGEINT>*>(
+ _current_return_columns[lsn_col_idx].get());
+ std::vector<Int128> binlog_lsns;
+ binlog_lsns.reserve(num_rows);
+ static constexpr Int128 kLow64Mask = (static_cast<Int128>(1) << 64) -
1;
+ for (size_t j = 0; j < num_rows; j++) {
+ const Int128 row_id = lsn_column->get_data()[j];
+
binlog_lsns.emplace_back((static_cast<Int128>(_opts.version.second) << 64) |
+ (row_id & kLow64Mask));
+ }
+ _current_return_columns[lsn_col_idx]->clear();
+ for (const auto& binlog_lsn : binlog_lsns) {
+ lsn_column->insert_data(reinterpret_cast<const
char*>(&binlog_lsn), 0);
+ }
+ return;
+ }
+
+ auto* lsn_column =
assert_cast<ColumnInt128*>(_current_return_columns[lsn_col_idx].get());
+ const auto* column_desc = _schema->column(lsn_col_idx);
+ auto column = Schema::get_data_type_ptr(*column_desc)->create_column();
+ DCHECK(column_desc->type() == FieldType::OLAP_FIELD_TYPE_LARGEINT);
+ auto* col_ptr = assert_cast<ColumnInt128*>(column.get());
+
+ static constexpr Int128 kLow64Mask = (static_cast<Int128>(1) << 64) - 1;
+ for (size_t j = 0; j < num_rows; j++) {
+ const Int128 row_id = lsn_column->get_element(j);
+ const Int128 binlog_lsn =
+ (static_cast<Int128>(_opts.version.second) << 64) | (row_id &
kLow64Mask);
+ col_ptr->insert_value(binlog_lsn);
+ }
+ _current_return_columns[lsn_col_idx] = std::move(column);
+}
+
+void SegmentIterator::_update_tso_col_if_needed(const std::vector<ColumnId>&
column_ids,
+ size_t num_rows) {
+ // use commit tso to replace timestamp col
+ if (_opts.version.first != _opts.version.second) {
+ return;
+ }
+
+ if (_opts.io_ctx.reader_type != ReaderType::READER_BINLOG &&
+ _opts.io_ctx.reader_type != ReaderType::READER_BINLOG_COMPACTION) {
+ return;
+ }
+
+ int32_t tso_col_idx = _schema->tso_col_idx();
+ if (tso_col_idx < 0 || std::ranges::find(column_ids, tso_col_idx) ==
column_ids.end()) {
+ return;
+ }
+
+ DCHECK_EQ(_opts.commit_tso.start_tso(), _opts.commit_tso.end_tso());
+ Int64 commit_tso = _opts.commit_tso.end_tso() == -1 ? 0 :
_opts.commit_tso.end_tso();
Review Comment:
commit_tso is composited by two parts: physical timestamp + logical counter.
here I think only physical timestamp is required?
commit_tso = commit_tso >> 18; ?
##########
conf/be.conf:
##########
@@ -31,11 +31,11 @@
JEMALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzz
JEMALLOC_PROF_PRFIX="jemalloc_heap_profile_"
# ports for admin, web, heartbeat service
-be_port = 9060
-webserver_port = 8040
-heartbeat_service_port = 9050
-brpc_port = 8060
-arrow_flight_sql_port = 8050
+be_port = 9061
Review Comment:
why?
##########
be/src/storage/task/engine_publish_version_task.cpp:
##########
@@ -520,18 +528,22 @@ void AsyncTabletPublishTask::handle() {
std::lock_guard<std::mutex> wrlock(_tablet->get_rowset_update_lock());
_stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us;
std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
- _engine.txn_manager()->get_txn_related_tablets(_transaction_id,
_partition_id,
- &tablet_related_rs);
+ std::map<TabletInfo, std::vector<RowsetSharedPtr>>
tablet_related_attach_rowsets;
+ _engine.txn_manager()->get_txn_related_tablets(
+ _transaction_id, _partition_id, &tablet_related_rs,
&tablet_related_attach_rowsets);
auto iter = tablet_related_rs.find(TabletInfo(_tablet->tablet_id(),
_tablet->tablet_uid()));
if (iter == tablet_related_rs.end()) {
return;
}
+ auto attach_rowsets_it = tablet_related_attach_rowsets.find(
+ TabletInfo(_tablet->tablet_id(), _tablet->tablet_uid()));
+ DCHECK(attach_rowsets_it != tablet_related_attach_rowsets.end());
RowsetSharedPtr rowset = iter->second;
Version version(_version, _version);
- auto publish_status =
- publish_version_and_add_rowset(_engine, _partition_id, _tablet,
rowset, _transaction_id,
- version, nullptr, _stats,
_commit_tso);
+ auto publish_status = publish_version_and_add_rowset(_engine,
_partition_id, _tablet, rowset,
+
attach_rowsets_it->second, _transaction_id,
+ version, nullptr,
_stats, _commit_tso);
Review Comment:
_commit_tso/_transaction_id/_stats is member of TabletPublishTxnTask.
I think we can directly use them in publish_version_and_add_rowset, no need
to pass it through arguments.
##########
be/src/storage/tablet/tablet_manager.cpp:
##########
@@ -480,6 +480,9 @@ TabletSharedPtr
TabletManager::_create_tablet_meta_and_dir_unlocked(
_gen_tablet_dir(data_dir->path(), tablet_meta->shard_id(),
request.tablet_id);
string schema_hash_dir = path_util::join_path_segments(
tablet_dir, std::to_string(request.tablet_schema.schema_hash));
+ bool has_row_binlog = tablet_meta->binlog_config().is_enable() &&
+ tablet_meta->binlog_config().isRowBinlogFormat();
Review Comment:
```suggestion
tablet_meta->binlog_config().is_row_binlog_format();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]