TsukiokaKogane commented on code in PR #64776:
URL: https://github.com/apache/doris/pull/64776#discussion_r3517969565


##########
fe/fe-core/src/main/java/org/apache/doris/qe/TimeBasedChangeVisibleWaiter.java:
##########
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.analysis.TableScanParams;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
+import org.apache.doris.nereids.analyzer.UnboundRelation;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.util.RelationUtil;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
+import org.apache.doris.tso.TSOTimestamp;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Before executing a time-based incremental read, block until every 
transaction that committed at
+ * or before the requested read timestamp of the target tables becomes 
visible. This guarantees the
+ * read sees a complete set of changes up to that time point.
+ *
+ * <p>Skipped entirely when the session enables eventual-consistent change 
reads, or when no table
+ * is involved. Waiting is bounded by session variable {@code 
change_visible_timeout_ms}; timing out
+ * raises a {@link UserException}.
+ */
+public class TimeBasedChangeVisibleWaiter {
+    private final ConnectContext context;
+
+    public static void waitForVisible(ConnectContext context, Plan plan, 
Map<List<String>, TableIf> tables)
+            throws UserException {
+        if (context.getSessionVariable().isEnableEventualConsistentChange() || 
tables.isEmpty()) {
+            return;
+        }
+        Map<Long, Map<Long, Long>> dbToTableEndTSO = collectDbToTableEndTSO(
+                context, plan, tables, System.currentTimeMillis());
+        new 
TimeBasedChangeVisibleWaiter(context).waitForDbToTableEndTSO(dbToTableEndTSO);
+    }
+
+    private TimeBasedChangeVisibleWaiter(ConnectContext context) {
+        this.context = context;
+    }
+
+    /**
+     * Walk the plan, pick out relations doing an incremental read, and for 
each OlapTable record the
+     * read end timestamp (converted to a full TSO) aggregated as dbId -> 
(tableId -> max endTSO).
+     */
+    @VisibleForTesting
+    static Map<Long, Map<Long, Long>> collectDbToTableEndTSO(ConnectContext 
context, Plan plan,
+            Map<List<String>, TableIf> tables, long defaultEndTsMs) {
+        Map<Long, Map<Long, Long>> dbToTableEndTSO = new HashMap<>();
+        plan.foreach(node -> {
+            if (!(node instanceof UnboundRelation)) {
+                return;
+            }
+            UnboundRelation relation = (UnboundRelation) node;
+            TableScanParams scanParams = relation.getScanParams();
+            if (scanParams == null || !scanParams.incrementalRead()) {
+                return;
+            }
+            TableIf table = tables.get(RelationUtil.getQualifierName(context, 
relation.getNameParts()));
+            if (table instanceof OlapTable) {
+                addTableEndTSO(dbToTableEndTSO, (OlapTable) table, 
getEndTsMs(scanParams, defaultEndTsMs));
+            }
+        });
+        return dbToTableEndTSO;
+    }
+
+    /**
+     * For each db, scan its committed-but-not-visible transactions; whenever 
a transaction's commit
+     * TSO falls within a target table's endTSO, wait for that transaction to 
become visible.
+     */
+    private void waitForDbToTableEndTSO(Map<Long, Map<Long, Long>> 
dbToTableEndTSO) throws UserException {
+        if (dbToTableEndTSO.isEmpty()) {
+            return;
+        }
+        long deadlineMs = System.currentTimeMillis() + 
context.getSessionVariable().getChangeVisibleTimeoutMs();
+        for (Map.Entry<Long, Map<Long, Long>> dbEntry : 
dbToTableEndTSO.entrySet()) {
+            long dbId = dbEntry.getKey();
+            Map<Long, Long> tableEndTSO = dbEntry.getValue();
+            for (TransactionState txn : getCommittedTransactions(dbId)) {
+                Pair<Long, Long> matchedTableEndTSO = 
findMatchedTableEndTSO(txn, tableEndTSO);
+                if (matchedTableEndTSO != null) {
+                    waitTransactionVisible(txn, dbId, 
matchedTableEndTSO.first, matchedTableEndTSO.second,
+                            deadlineMs);
+                }
+            }
+        }
+    }
+
+    /**
+     * Return (tableId, endTSO) if the transaction is COMMITTED and its commit 
TSO is within the
+     * requested endTSO of one of its tables; otherwise null (no need to wait).
+     */
+    private Pair<Long, Long> findMatchedTableEndTSO(TransactionState txn, 
Map<Long, Long> tableEndTSO) {
+        long commitTSO = txn.getCommitTSO();
+        if (txn.getTransactionStatus() != TransactionStatus.COMMITTED || 
commitTSO < 0) {
+            return null;
+        }
+        for (Long tableId : txn.getTableIdList()) {
+            Long endTSO = tableEndTSO.get(tableId);
+            if (endTSO != null && commitTSO <= endTSO) {
+                return Pair.of(tableId, endTSO);
+            }
+        }
+        return null;
+    }
+
+    private List<TransactionState> getCommittedTransactions(long dbId) throws 
UserException {
+        try {
+            return 
Env.getCurrentGlobalTransactionMgr().getCommittedTransactions(dbId);
+        } catch (Exception e) {
+            throw new UserException("get committed transactions failed. dbId=" 
+ dbId, e);
+        }
+    }
+
+    /**
+     * Poll-wait until the transaction leaves COMMITTED (becomes visible) or 
the deadline passes;
+     * throw if it is still COMMITTED at timeout.
+     */
+    private void waitTransactionVisible(TransactionState txn, long dbId, long 
tableId,
+            long endTSO, long deadlineMs) throws UserException {
+        long remainingMs = deadlineMs - System.currentTimeMillis();
+        while (txn.getTransactionStatus() == TransactionStatus.COMMITTED && 
remainingMs > 0) {
+            try {
+                txn.waitTransactionVisible(remainingMs);
+            } catch (InterruptedException ignored) {
+                // Keep the previous wait behavior.
+            }
+            remainingMs = deadlineMs - System.currentTimeMillis();
+        }
+        if (txn.getTransactionStatus() == TransactionStatus.COMMITTED) {
+            throw new UserException(String.format(
+                    "timeout waiting transaction become visible for time-based 
read, "
+                            + "txnId=%d dbId=%d tableId=%d endTSO=%d",
+                    txn.getTransactionId(), dbId, tableId, endTSO));
+        }
+    }
+
+    // Resolve the read end timestamp (ms) from scan params; fall back to 
defaultEndTsMs (the query
+    // start time) when absent or non-positive.
+    private static long getEndTsMs(TableScanParams scanParams, long 
defaultEndTsMs) {
+        if 
(scanParams.getMapParams().containsKey(OlapScanNode.OLAP_END_TIMESTAMP)) {
+            long endTsMs = OlapScanNode.parseChangeTimestamp(
+                    
scanParams.getMapParams().get(OlapScanNode.OLAP_END_TIMESTAMP));
+            return endTsMs > 0 ? endTsMs : defaultEndTsMs;
+        }
+        return defaultEndTsMs;
+    }
+
+    // Compose endTsMs into a full TSO and merge into dbId -> (tableId -> 
endTSO), keeping the max.
+    private static void addTableEndTSO(Map<Long, Map<Long, Long>> 
dbToTableEndTSO, OlapTable table, long endTsMs) {
+        dbToTableEndTSO.computeIfAbsent(table.getDatabase().getId(), ignored 
-> new HashMap<>())

Review Comment:
   not accurate



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to