yujun777 commented on code in PR #63850:
URL: https://github.com/apache/doris/pull/63850#discussion_r3361113253
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java:
##########
@@ -895,12 +897,19 @@ private static List<RewriteJob> getWholeTreeRewriteJobs(
ImmutableSet.of(LogicalCTEAnchor.class),
() -> {
List<RewriteJob> rewriteJobs =
Lists.newArrayListWithExpectedSize(300);
- rewriteJobs.add(
- topic("normalize olap table stream scan",
-
custom(RuleType.NORMALIZE_OlAP_TABLE_STREAM_SCAN,
- NormalizeOlapTableStreamScan::new)
- )
- );
+ if (Config.enable_table_stream) {
Review Comment:
`NormalizeOlapTableBinlogScan` is also needed for base-table `@incr(...)`
scans, but this whole block is gated by `Config.enable_table_stream`.
`BindRelation` accepts `table@incr(...)` based on row binlog requirements and
does not require `enable_table_stream`, so with binlog enabled but table stream
disabled the change scan can skip normalization.
In that case the scan remains non-incremental; `OlapScanNode` will set
start/end timestamp in the `hasChangeScan` branch but will not set
`binlog_scan_type`, so DETAIL/MIN_DELTA/APPEND_ONLY semantics depend on BE
defaults. Either `@incr` should be gated by the same config, or the binlog
normalize rule should run independently of table stream.
##########
fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java:
##########
@@ -1745,7 +1764,11 @@ protected void
unprotectedCommitTransaction2PC(TransactionState transactionState
continue;
}
partitionCommitInfo.setVersion(partition.getNextVersion());
- partitionCommitInfo.setVersionTime(System.currentTimeMillis());
+ if (Config.enable_feature_binlog && table.enableTso()) {
+ partitionCommitInfo.setVersionTime(commitTSO);
Review Comment:
The 2PC path does not populate the new `PartitionCommitInfo.tso` field. The
normal commit paths use `generatePartitionCommitInfo(..., commitTSO)`, but here
we only set the version and then put `commitTSO` into `versionTime`. During
publish, `partitionCommitInfo.getTso()` is still the default `-1`, so
`partition.updateVisibleVersionAndTime(version, versionTime, tso)` leaves
`Partition.tso` invalid for row-binlog/table-stream offset tracking. It also
stores an encoded TSO value as visible-version time.
This branch should set the partition commit TSO separately from versionTime,
same as the non-2PC commit paths.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java:
##########
@@ -295,17 +297,28 @@ public AbstractInsertExecutor initPlan(ConnectContext
ctx, StmtExecutor stmtExec
List<ScanNode> tableStreamScanNodes =
buildResult.planner.getScanNodes().stream()
- .filter(s -> s.getTableIf() instanceof
OlapTableStreamWrapper).collect(Collectors.toList());
+ .filter((s -> (s.getTableIf() instanceof
OlapTableStreamWrapper
Review Comment:
This also matches base-table `@incr(...)` scans, not only table-stream
scans. `BindRelation` builds base-table CHANGES reads as incremental
`OlapScanNode`s over `RowBinlogTableWrapper`, but those wrappers are created
without a parent stream. Then line 320 does `((RowBinlogTableWrapper)
scanNode.getTableIf()).getParent()` and line 323 dereferences it, so `INSERT
INTO ... SELECT ... FROM base_table@incr(...)` can NPE or incorrectly try to
register a stream offset update for a non-stream read.
This filter should only include real stream consumption scans, e.g.
`OlapTableStreamWrapper` or a `RowBinlogTableWrapper` whose parent is non-null.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]