morrySnow commented on code in PR #63850:
URL: https://github.com/apache/doris/pull/63850#discussion_r3340334899
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/NormalizeOlapTableStreamScan.java:
##########
@@ -72,49 +113,125 @@ public Plan
visitLogicalOlapTableStreamScan(LogicalOlapTableStreamScan scan, Voi
&& ((SlotReference)
slot).getOriginalColumn().isPresent()
&& ((SlotReference) slot).getOriginalColumn().get()
.equals(Column.STREAM_SEQ_VIRTUAL_COLUMN)))
- .collect(Collectors.toList());
+ .collect(Collectors.toList()));
- if (originSlots.equals(newSlots)) {
- return scan;
- }
-
- // add delete sign column if unique base table
- Slot deleteSlot = null;
- for (Column column : scan.getTable().getBaseSchema(true)) {
- if (column.getName().equals(Column.DELETE_SIGN)) {
- deleteSlot =
SlotReference.fromColumn(StatementScopeIdGenerator.newExprId(), scan.getTable(),
- column, scan.qualified());
- newSlots.add(deleteSlot);
- break;
+ // history plan
+ if (!historicalPartitionIds.isEmpty()) {
+ List<Slot> scanSlots = new ArrayList<>(newSlots);
+ // add delete sign column if unique base table
+ Slot deleteSlot = null;
+ for (Column column : scan.getTable().getBaseSchema(true)) {
+ if (column.getName().equals(Column.DELETE_SIGN)) {
+ deleteSlot =
SlotReference.fromColumn(StatementScopeIdGenerator.newExprId(), scan.getTable(),
+ column, scan.qualified());
+ scanSlots.add(deleteSlot);
+ break;
+ }
}
- }
- Plan plan = scan.withCachedOutput(newSlots);
- if (deleteSlot != null) {
- Expression conjunct = new EqualTo(deleteSlot, new
TinyIntLiteral((byte) 0));
- if (!scan.getTable().getEnableUniqueKeyMergeOnWrite()) {
- plan = scan.withPreAggStatus(PreAggStatus.off(
- Column.DELETE_SIGN + " is used as conjuncts."));
+ Plan plan =
scan.withSelectedPartitionIds(historicalPartitionIds, true)
+ .withCachedOutput(new ArrayList<>(scanSlots))
+ .withNormalized(true);
+ if (deleteSlot != null) {
+ Expression conjunct = new EqualTo(deleteSlot, new
TinyIntLiteral((byte) 0));
+ if (!scan.getTable().getEnableUniqueKeyMergeOnWrite()) {
+ plan = scan.withPreAggStatus(PreAggStatus.off(
Review Comment:
这里应该基于上面已经构造好的 `plan` 调 `withPreAggStatus`,而不是回到原始 `scan`。当前写法会丢掉 line
131-133 设置的 `historicalPartitionIds`、包含 `deleteSlot` 的 `cachedOutput` 以及
`normalized` 标记;对 UNIQUE MOR stream 的历史扫描,filter child 可能不再包含这个 delete
slot,分区也会恢复成原始 scan 的范围。
##########
be/src/storage/iterator/block_reader.cpp:
##########
@@ -74,6 +111,426 @@ Status BlockReader::next_block_with_aggregation(Block*
block, bool* eof) {
return res;
}
+Status BlockReader::_ensure_binlog_column_pos(const Block& src_block) {
+ if (_binlog_column_pos_inited) {
+ if (_binlog_op_pos >= 0 && _binlog_op_pos < src_block.columns() &&
+ src_block.get_by_position(_binlog_op_pos).name ==
kRowBinlogOpColName) {
+ return Status::OK();
+ }
+ _binlog_op_pos = -1;
+ _binlog_lsn_pos = -1;
+ _binlog_timestamp_pos = -1;
+ _binlog_column_pos_inited = false;
+ }
+
+ const size_t col_num = src_block.columns();
+ size_t col_names_total_length = 0;
+
+ for (size_t i = 0; i < col_num; ++i) {
+ const auto& name = src_block.get_by_position(i).name;
+ col_names_total_length += name.size();
+ if (name == kRowBinlogOpColName) {
+ _binlog_op_pos = static_cast<int>(i);
+ } else if (name == kRowBinlogLsnColName) {
+ _binlog_lsn_pos = static_cast<int>(i);
+ } else if (name == kRowBinlogTimestampColName) {
+ _binlog_timestamp_pos = static_cast<int>(i);
+ }
+ }
+
+ if (_binlog_op_pos < 0) {
+ std::string col_names;
+ col_names.reserve(col_names_total_length + (col_num > 0 ? (col_num -
1) * 2 : 0));
+ if (col_num > 0) {
+ col_names.append(src_block.get_by_position(0).name);
+ for (size_t i = 1; i < col_num; ++i) {
+ col_names.append(", ");
+ col_names.append(src_block.get_by_position(i).name);
+ }
+ }
+ return Status::InternalError("row binlog op column not found, block
columns: {}",
+ col_names);
+ }
+
+ _binlog_column_pos_inited = true;
+ return Status::OK();
+}
+
+int64_t BlockReader::_read_binlog_op(const IColumn& col, size_t row) const {
+ const IColumn* cur = &col;
+ if (const auto* nullable = check_and_get_column<ColumnNullable>(*cur)) {
+ if (nullable->is_null_at(row)) {
+ return ROW_BINLOG_UNKNOWN;
+ }
+ cur = &nullable->get_nested_column();
+ }
+
+ if (const auto* int64_col = check_and_get_column<ColumnInt64>(*cur)) {
+ return int64_col->get_element(row);
+ }
+
+ return ROW_BINLOG_UNKNOWN;
+}
+
+Status BlockReader::_write_binlog_op(IColumn& col, int64_t op) const {
+ IColumn* cur = &col;
+ ColumnNullable* nullable = nullptr;
+ if (auto* n = typeid_cast<ColumnNullable*>(cur)) {
+ nullable = n;
+ cur = &nullable->get_nested_column();
+ }
+
+ if (auto* int64_col = typeid_cast<ColumnInt64*>(cur)) {
+ int64_col->insert_value(op);
+ } else {
+ return Status::InternalError("invalid column type");
+ }
+
+ if (nullable != nullptr) {
+ nullable->get_null_map_data().push_back(0);
+ }
+ return Status::OK();
+}
+
+bool BlockReader::_is_binlog_meta_column(int idx) const {
+ return idx == _binlog_op_pos || idx == _binlog_lsn_pos || idx ==
_binlog_timestamp_pos;
+}
+
+int BlockReader::_resolve_source_column_index(const Block& src_block, int idx,
+ bool use_before) const {
+ if (!use_before || _is_binlog_meta_column(idx)) {
+ return idx;
+ }
+
+ return resolve_before_column_index(src_block, idx, _binlog_op_pos);
+}
+
+void BlockReader::_init_pending_row_columns(const Block& block) {
+ if (!_pending_row_columns.empty()) {
+ return;
+ }
+ _pending_row_columns = block.clone_empty_columns();
+}
+
+bool BlockReader::_emit_pending_row(Block* block, MutableColumns&
target_columns,
+ size_t& output_row_count, bool* eof) {
+ if (!_has_pending_row) {
+ return false;
+ }
+
+ for (size_t i = 0; i < _pending_row_columns.size(); ++i) {
+ target_columns[i]->insert_from(*_pending_row_columns[i], 0);
+ _pending_row_columns[i]->clear();
+ }
+ _has_pending_row = false;
+ output_row_count++;
+
+ if (_eof) {
+ block->set_columns(std::move(target_columns));
+ *eof = false;
+ return true;
+ }
+
+ return false;
+}
+
+Status BlockReader::_append_change_row(MutableColumns& target_columns, const
Block& src_block,
+ size_t row_pos, int64_t output_op, bool
use_before) {
+ RETURN_IF_ERROR(_ensure_binlog_column_pos(src_block));
+ for (auto idx : _normal_columns_idx) {
+ int target_col_idx = _return_columns_loc[idx];
+ if (target_col_idx < 0) {
+ continue;
+ }
+ if (idx == _binlog_op_pos) {
+ RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
output_op));
+ continue;
+ }
+ int source_idx = _resolve_source_column_index(src_block, idx,
use_before);
+ RETURN_IF_ERROR(insert_cell_with_wrapper_adaptation(
+ *target_columns[target_col_idx],
*src_block.get_by_position(source_idx).column,
+ row_pos));
+ }
+ return Status::OK();
+}
+
+Status BlockReader::_min_delta_next_block(Block* block, bool* eof) {
+ if (UNLIKELY(_eof && !_has_pending_row)) {
+ *eof = true;
+ return Status::OK();
+ }
+
+ if (_stored_data_columns.empty()) {
+ _stored_data_columns = _next_row.block->clone_empty_columns();
+ }
+
+ auto target_columns_guard = block->mutate_columns_scoped();
+ auto& target_columns = target_columns_guard.mutable_columns();
+ size_t output_row_count = 0;
+
+ _init_pending_row_columns(*block);
+ if (_emit_pending_row(block, target_columns, output_row_count, eof)) {
+ return Status::OK();
+ }
+
+ while (output_row_count < _reader_context.batch_size && !_eof) {
+ if (_stored_data_columns[0]->empty()) {
+ for (size_t i = 0; i < _stored_data_columns.size(); ++i) {
+
_stored_data_columns[i]->insert_from(*_next_row.block->get_by_position(i).column,
+ _next_row.row_pos);
+ }
+ }
+
+ IteratorRowRef last_row_ref = _next_row;
+
+ auto res = _vcollect_iter.next(&_next_row);
+ if (UNLIKELY(res.is<END_OF_FILE>())) {
+ _eof = true;
+ *eof = true;
+ } else if (UNLIKELY(!res.ok())) {
+ return res;
+ }
+
+ if (!_eof && _min_delta_next_row_has_same_key()) {
+ continue;
+ }
+
+ if (UNLIKELY(last_row_ref.block == nullptr)) {
+ return Status::InternalError("invalid row reference in min-delta
stream reader");
+ }
+ RETURN_IF_ERROR(_ensure_binlog_column_pos(*last_row_ref.block));
+ auto first_op = _read_binlog_op(*_stored_data_columns[_binlog_op_pos],
0);
+
+ auto& last_op_col =
last_row_ref.block->get_by_position(_binlog_op_pos).column;
+ auto last_op = _read_binlog_op(*last_op_col, last_row_ref.row_pos);
+
+ auto result = AggregateFunctionMinDelta::calculate_result(first_op,
last_op);
+ switch (result) {
+ case AggregateFunctionMinDelta::ResultType::SKIP:
+ break;
+ case AggregateFunctionMinDelta::ResultType::INSERT:
+ for (auto idx : _normal_columns_idx) {
+ int target_col_idx = _return_columns_loc[idx];
+ if (idx == _binlog_op_pos) {
+
RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
+ STREAM_CHANGE_INSERT));
+ } else {
+ target_columns[target_col_idx]->insert_from(
+ *last_row_ref.block->get_by_position(idx).column,
last_row_ref.row_pos);
+ }
+ }
+ output_row_count++;
+ break;
+ case AggregateFunctionMinDelta::ResultType::DELETE:
+ for (auto idx : _normal_columns_idx) {
+ int target_col_idx = _return_columns_loc[idx];
+ if (idx == _binlog_op_pos) {
+
RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
+ STREAM_CHANGE_DELETE));
+ } else {
+ target_columns[target_col_idx]->insert_from(
+ *last_row_ref.block->get_by_position(idx).column,
last_row_ref.row_pos);
+ }
+ }
+ output_row_count++;
+ break;
+ case AggregateFunctionMinDelta::ResultType::UPDATE_BEFORE_AFTER:
+ for (auto idx : _normal_columns_idx) {
+ int target_col_idx = _return_columns_loc[idx];
+ if (idx == _binlog_op_pos) {
+
RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
+
STREAM_CHANGE_UPDATE_BEFORE));
+ } else if (idx == _binlog_lsn_pos) {
+ target_columns[target_col_idx]->insert_from(
+ *last_row_ref.block->get_by_position(idx).column,
last_row_ref.row_pos);
+ } else {
+ int source_idx =
_resolve_source_column_index(*last_row_ref.block, idx, true);
+
target_columns[target_col_idx]->insert_from(*_stored_data_columns[source_idx],
+ 0);
+ }
+ }
+ output_row_count++;
+
+ if (output_row_count >= _reader_context.batch_size) {
+ for (auto& col : _pending_row_columns) {
+ col->clear();
+ }
+ for (auto idx : _normal_columns_idx) {
+ int target_col_idx = _return_columns_loc[idx];
+ if (idx == _binlog_op_pos) {
+
RETURN_IF_ERROR(_write_binlog_op(*_pending_row_columns[target_col_idx],
+
STREAM_CHANGE_UPDATE_AFTER));
+ } else {
+ _pending_row_columns[target_col_idx]->insert_from(
+
*last_row_ref.block->get_by_position(idx).column,
+ last_row_ref.row_pos);
+ }
+ }
+ _has_pending_row = true;
+ } else {
+ for (auto idx : _normal_columns_idx) {
+ int target_col_idx = _return_columns_loc[idx];
+ if (idx == _binlog_op_pos) {
+
RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
+
STREAM_CHANGE_UPDATE_AFTER));
+ } else {
+ target_columns[target_col_idx]->insert_from(
+
*last_row_ref.block->get_by_position(idx).column,
+ last_row_ref.row_pos);
+ }
+ }
+ output_row_count++;
+ }
+ break;
+ }
+
+ for (auto& col : _stored_data_columns) {
+ col->clear();
+ }
+ }
+
+ block->set_columns(std::move(target_columns));
+ *eof = _eof && !_has_pending_row;
+ return Status::OK();
+}
+
+Status BlockReader::_detail_change_next_block(Block* block, bool* eof) {
+ auto output_template_block = block->clone_empty();
+ auto target_columns_guard = block->mutate_columns_scoped();
+ auto& target_columns = target_columns_guard.mutable_columns();
+ size_t output_row_count = 0;
+ _init_pending_row_columns(*block);
+ if (_emit_pending_row(block, target_columns, output_row_count, eof)) {
+ return Status::OK();
+ }
+
+ while (output_row_count < _reader_context.batch_size) {
+ if (_vcollect_iter.is_merge()) {
+ if (_eof) {
+ break;
+ }
+ if (UNLIKELY(_next_row.block == nullptr)) {
+ return Status::InternalError("invalid row reference in detail
stream reader");
+ }
+ const Block& source_block = *_next_row.block;
+ const size_t row = _next_row.row_pos;
+ RETURN_IF_ERROR(_ensure_binlog_column_pos(source_block));
+ int64_t op =
_read_binlog_op(*source_block.get_by_position(_binlog_op_pos).column, row);
+ if (op == ROW_BINLOG_UPDATE) {
+ RETURN_IF_ERROR(_append_change_row(target_columns,
source_block, row,
+
STREAM_CHANGE_UPDATE_BEFORE, true));
+ output_row_count++;
+ if (output_row_count >= _reader_context.batch_size) {
+ for (auto& col : _pending_row_columns) {
+ col->clear();
+ }
+ RETURN_IF_ERROR(_append_change_row(_pending_row_columns,
source_block, row,
+
STREAM_CHANGE_UPDATE_AFTER, false));
+ _has_pending_row = true;
+ } else {
+ RETURN_IF_ERROR(_append_change_row(target_columns,
source_block, row,
+
STREAM_CHANGE_UPDATE_AFTER, false));
+ output_row_count++;
+ }
+ } else if (op == ROW_BINLOG_APPEND) {
+ RETURN_IF_ERROR(_append_change_row(target_columns,
source_block, row,
+ STREAM_CHANGE_INSERT,
false));
+ output_row_count++;
+ } else if (op == ROW_BINLOG_DELETE) {
+ RETURN_IF_ERROR(_append_change_row(target_columns,
source_block, row,
+ STREAM_CHANGE_DELETE,
false));
+ output_row_count++;
+ }
+
+ auto res = _vcollect_iter.next(&_next_row);
+ if (UNLIKELY(res.is<END_OF_FILE>())) {
+ _eof = true;
+ *eof = true;
+ } else if (UNLIKELY(!res.ok())) {
+ return res;
+ }
+ continue;
+ }
+
+ DCHECK(_next_row.block != nullptr);
+ auto source_template_block = _next_row.block->clone_empty();
+ Block source_block_storage;
+ source_block_storage = source_template_block.clone_empty();
+ Block* read_block = &source_block_storage;
+ Status res = _vcollect_iter.next(read_block);
+ if (UNLIKELY(!res.ok() && !res.is<END_OF_FILE>())) {
+ return res;
+ }
+ *eof = res.is<END_OF_FILE>();
+ _eof = *eof;
+ const Block& source_block = *read_block;
+ if (source_block.rows() == 0) {
+ break;
+ }
+ RETURN_IF_ERROR(_ensure_binlog_column_pos(source_block));
+ auto result_columns = output_template_block.clone_empty_columns();
+
+ for (size_t row = 0;
Review Comment:
这里按 `source_block` 循环时只处理到 `output_row_count < batch_size`。一旦输出 batch 满,尤其是
`ROW_BINLOG_UPDATE` 一条输入会输出 before/after 两行,循环会提前停止,然后下面直接 `return
Status::OK()`,但没有保存当前 `source_block` 和下一行位置。下一次调用会重新
`_vcollect_iter.next(read_block)` 读取下一个 block,当前 block 里未处理的 change 行就丢了。需要像
merge 分支一样保留未处理的 row cursor,或者让非 merge 分支也逐行推进。
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java:
##########
@@ -243,6 +257,28 @@ private LogicalPlan makeOlapScan(TableIf table,
UnboundRelation unboundRelation,
CollectionUtils.isEmpty(partIds) ? ((OlapTable)
table).getPartitionIds() : partIds, indexId,
preAggStatus, CollectionUtils.isEmpty(partIds) ?
ImmutableList.of() : partIds,
unboundRelation.getHints(),
unboundRelation.getTableSample(), ImmutableList.of());
+ } else if (isChangeRead(unboundRelation)) {
+ if (unboundRelation.getScanParams() != null &&
unboundRelation.getScanParams().incrementalRead()) {
+ OlapTable olapTable = (OlapTable) table;
+ ChangeScanInfo changeScanInfo =
buildChangeScanInfo(unboundRelation.getScanParams());
+ validateChangeReadRequirements(olapTable, changeScanInfo);
+ unboundRelation =
unboundRelation.withChangeScanInfo(changeScanInfo);
+ }
+ OlapTable olapTable = (OlapTable) table;
+ RowBinlogTableWrapper wrapper = new
RowBinlogTableWrapper(olapTable);
+ ChangeScanInfo changeScanInfo =
unboundRelation.getChangeScanInfo().get();
+ Preconditions.checkState(changeScanInfo != null);
+ if (changeScanInfo.getInformationKind() ==
ChangeScanInfo.InformationKind.DETAIL
+ && !Config.enable_split_binlog_before) {
+ scan = new LogicalOlapScan(unboundRelation.getRelationId(),
Review Comment:
这个分支会创建普通 `LogicalOlapScan`,而 `LogicalOlapScan.isIncrementalScan()` 固定返回
false。后面 `PhysicalPlanTranslator` 虽然会 `applyChangeScanInfo`,但 `OlapScanNode` 的
`incrementalScan` 仍是 false,于是 `addScanRangeLocations` 只走 `else if
(hasChangeScan)`,不会设置 `binlog_scan_type`。当 `enable_split_binlog_before=false`
时,BE 收到的是 `binlog_read_source=CHANGES` 但 `binlog_scan_type=NONE`,`DETAIL` 的
change 转换路径不会启用。这里需要仍然生成 incremental/binlog scan,或者在非 incremental change scan
分支也下发 scan type。
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/TableStreamManager.java:
##########
@@ -97,7 +128,129 @@ public Set<Long> getTableStreamIds(DatabaseIf db) {
return result;
}
- public void fillTableStreamValuesMetadataResult(List<TRow> dataBatch) {
+ public void cleanupStalePartitionOffsets() {
+ List<Long> staleDbIds = new ArrayList<>();
+ List<Pair<Long, Long>> staleStreamIds = new ArrayList<>();
+ List<PruneTableStreamPartitionOffsetInfo.Entry> pruneEntries = new
ArrayList<>();
+ for (Map.Entry<Long, Set<Long>> entry : copyDbStreamMap().entrySet()) {
+ Optional<Database> db =
Env.getCurrentInternalCatalog().getDb(entry.getKey());
+ if (!db.isPresent()) {
+ staleDbIds.add(entry.getKey());
+ continue;
+ }
+ for (Long tableId : entry.getValue()) {
+ Optional<Table> table = db.get().getTable(tableId);
+ if (!table.isPresent()) {
+ staleStreamIds.add(Pair.of(db.get().getId(), tableId));
+ continue;
+ }
+ if (!(table.get() instanceof OlapTableStream)) {
+ staleStreamIds.add(Pair.of(db.get().getId(), tableId));
+ continue;
+ }
+ cleanupStalePartitionOffsets((OlapTableStream)
table.get()).ifPresent(pruneEntries::add);
+ }
+ }
+ removeStaleDbAndStream(staleDbIds, staleStreamIds);
+ if (!pruneEntries.isEmpty()) {
+
Env.getCurrentEnv().getEditLog().logPruneTableStreamPartitionOffsets(
+ new PruneTableStreamPartitionOffsetInfo(pruneEntries));
+ }
+ }
+
+ private Optional<PruneTableStreamPartitionOffsetInfo.Entry>
cleanupStalePartitionOffsets(OlapTableStream stream) {
+ if (stream.isDisabled() || stream.isStale()) {
+ return Optional.empty();
+ }
+ OlapTable baseTable = stream.getBaseTableNullable();
+ if (baseTable == null) {
+ return Optional.empty();
+ }
+ Set<Long> validPartitionIds;
+ if (!baseTable.tryReadLock(Table.TRY_LOCK_TIMEOUT_MS,
TimeUnit.MILLISECONDS)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("skip cleaning stream {} because base table {} read
lock is busy",
+ stream.getName(), baseTable.getName());
+ }
+ return Optional.empty();
+ }
+ try {
+ if (baseTable.isDropped) {
+ return Optional.empty();
+ }
+ validPartitionIds = new HashSet<>(baseTable.getPartitionIds());
+ } finally {
+ baseTable.readUnlock();
+ }
+ if (!stream.tryWriteLockIfExist(Table.TRY_LOCK_TIMEOUT_MS,
TimeUnit.MILLISECONDS)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("skip cleaning stream {} because stream write lock
is busy", stream.getName());
+ }
+ return Optional.empty();
+ }
+ try {
+ if (stream.isDisabled() || stream.isStale()) {
+ return Optional.empty();
+ }
+ Set<Long> stalePartitionIds =
stream.unprotectedCollectStalePartitionOffsetIds(validPartitionIds);
+ if (stalePartitionIds.isEmpty()) {
+ return Optional.empty();
+ }
+ int removedPartitionCount =
stream.unprotectedPrunePartitionOffsets(stalePartitionIds);
+ if (removedPartitionCount > 0) {
+ LOG.info("cleaned {} stale partition offset entries from
stream {}.{} ({})",
+ removedPartitionCount,
stream.getDatabase().getFullName(), stream.getName(), stream.getId());
+ }
+ return Optional.of(new PruneTableStreamPartitionOffsetInfo.Entry(
+ stream.getDatabase().getId(), stream.getId(),
stalePartitionIds));
+ } finally {
+ stream.writeUnlock();
+ }
+ }
+
+ public void
replayPruneTableStreamPartitionOffsets(PruneTableStreamPartitionOffsetInfo
info) {
+ if (info == null || info.getEntries() == null ||
info.getEntries().isEmpty()) {
+ return;
+ }
+ for (PruneTableStreamPartitionOffsetInfo.Entry entry :
info.getEntries()) {
+ replayPruneTableStreamPartitionOffsets(entry);
+ }
+ }
+
+ private void
replayPruneTableStreamPartitionOffsets(PruneTableStreamPartitionOffsetInfo.Entry
entry) {
+ if (entry == null || entry.getPartitionIds() == null ||
entry.getPartitionIds().isEmpty()) {
+ return;
+ }
+ Optional<Database> db =
Env.getCurrentInternalCatalog().getDb(entry.getDbId());
+ if (!db.isPresent()) {
+ LOG.info("skip replay pruning partition offsets because db {} does
not exist", entry.getDbId());
+ return;
+ }
+ Optional<Table> table = db.get().getTable(entry.getStreamId());
+ if (!table.isPresent()) {
+ LOG.info("skip replay pruning partition offsets because stream
{}.{} does not exist",
+ entry.getDbId(), entry.getStreamId());
+ return;
+ }
+ if (!(table.get() instanceof OlapTableStream)) {
+ LOG.info("skip replay pruning partition offsets because table
{}.{} is not an olap table stream",
+ entry.getDbId(), entry.getStreamId());
+ return;
+ }
+ OlapTableStream stream = (OlapTableStream) table.get();
+ if (!stream.tryWriteLockIfExist(Table.TRY_LOCK_TIMEOUT_MS,
TimeUnit.MILLISECONDS)) {
Review Comment:
EditLog replay 不能因为 transient lock timeout 就跳过。Follower 回放或 master failover
后回放时,如果 stream 正被读锁持有,这里 `return` 会让该 FE 永久丢掉这条 prune journal,导致 partition
offset 状态和 master 不一致。cleanup daemon 采集阶段可以 try-lock 跳过,但 replay
阶段应该阻塞拿写锁,或失败/重试,保证回放和 master-side mutation 等价。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]