github-actions[bot] commented on code in PR #61199:
URL: https://github.com/apache/doris/pull/61199#discussion_r2992774631
##########
fe/fe-common/src/main/java/org/apache/doris/common/Config.java:
##########
@@ -3361,6 +3361,50 @@ public static int metaServiceRpcRetryTimes() {
@ConfField(mutable = true, masterOnly = true)
public static long mow_get_ms_lock_retry_backoff_interval = 80;
+ @ConfField(mutable = true, masterOnly = true, description = {
+ "Whether to enable TSO."}, varType =
VariableAnnotation.EXPERIMENTAL)
+ public static boolean enable_tso_feature = false;
+
+ @ConfField(mutable = false, masterOnly = true, description = {
+ "TSO service update interval in milliseconds. Default is 50, which
means the TSO service "
+ + "will perform timestamp update checks every 50
milliseconds."})
+ public static int tso_service_update_interval_ms = 50;
+
+ @ConfField(mutable = true, masterOnly = true, description = {
+ "TSO service max retry count. Default is 3, which means the TSO
service will retry 3 times"
+ + "to update the global timestamp."})
+ public static int tso_max_update_retry_count = 3;
+
+ @ConfField(mutable = true, masterOnly = true, description = {
+ "TSO get max retry count. Default is 10, which means the TSO
service will retry 10 times"
+ + "to generate TSO."})
+ public static int tso_max_get_retry_count = 10;
+
+ @ConfField(mutable = true, masterOnly = true, description = {
+ "TSO service time window in milliseconds. Default is 5000, which
means the TSO service"
+ + "will apply for a TSO time window of 5000ms from BDBJE
once."})
+ public static int tso_service_window_duration_ms = 5000;
+
+ @ConfField(mutable = true, masterOnly = true, description = {
+ "Max tolerated clock backward threshold during TSO calibration in
milliseconds. "
+ + "Exceeding this threshold will fail enabling TSO.
Default is 30 minutes."})
+ public static long tso_clock_backward_startup_threshold_ms = 30L * 60 *
1000;
+
+ @ConfField(mutable = true, description = {
+ "TSO service time offset in milliseconds. Only for test. Default
is 0, which means the TSO service"
+ + "timestamp offset is 0 milliseconds."})
+ public static int tso_time_offset_debug_mode = 0;
+
+ @ConfField(mutable = true, masterOnly = true, description = {
+ "Whether to enable persisting TSO window end into edit log.
Enabling emits new op code,"
+ + " which may break rollback to older versions."})
+ public static boolean enable_tso_persist_journal = false;
Review Comment:
**[High] No TSO persistence by default risks monotonicity violation.**
Both `enable_tso_persist_journal` (here) and `enable_tso_checkpoint_module`
default to `false`. This means:
1. On FE restart, `windowEndTSO` will be 0
2. `calibrateTimestamp()` will set physical time to
`System.currentTimeMillis()`
3. If NTP adjusts the clock backward between restarts, previously issued
TSOs could be reissued
This breaks the "monotonically increasing" guarantee stated in the PR
description. For an experimental feature, consider either:
- Making `enable_tso_persist_journal = true` the default when
`enable_tso_feature` is enabled, OR
- Clearly documenting that TSO monotonicity across restarts requires
enabling these persistence flags, OR
- Having `calibrateTimestamp()` refuse to start if
`enable_tso_persist_journal` is false (to make it fail-fast)
##########
fe/fe-core/src/main/java/org/apache/doris/tso/TSOService.java:
##########
@@ -0,0 +1,503 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tso;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.io.CountingDataOutputStream;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.journal.local.LocalJournal;
+import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.persist.EditLog;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class TSOService extends MasterDaemon {
+ private static final Logger LOG = LogManager.getLogger(TSOService.class);
+
+ // Global timestamp with physical time and logical counter
+ private final TSOTimestamp globalTimestamp = new TSOTimestamp();
+ // Lock for thread-safe access to global timestamp
+ private final ReentrantLock lock = new ReentrantLock();
+ // Guard value for time window updates (in milliseconds)
+ private static final long UPDATE_TIME_WINDOW_GUARD = 1;
+
+ private final AtomicBoolean isInitialized = new AtomicBoolean(false);
+ private final AtomicBoolean fatalClockBackwardReported = new
AtomicBoolean(false);
+ private final AtomicLong windowEndTSO = new AtomicLong(0);
+
+ private static final class TSOClockBackwardException extends
RuntimeException {
+ private TSOClockBackwardException(String message) {
+ super(message);
+ }
+ }
+
+ /**
+ * Constructor initializes the TSO service with update interval
+ */
+ public TSOService() {
+ super("TSO-service", Config.tso_service_update_interval_ms);
+ }
+
+ /**
+ * Start the TSO service.
+ */
+ @Override
+ public synchronized void start() {
+ super.start();
+ }
+
+ /**
+ * Periodically update timestamp after catalog is ready
+ * This method is called by the MasterDaemon framework
+ */
+ @Override
+ protected void runAfterCatalogReady() {
+ if (!Config.enable_tso_feature) {
+ isInitialized.set(false);
Review Comment:
**[Medium] Resetting `isInitialized` while `getTSO()` may be in-flight.**
When `enable_tso_feature` is dynamically set to `false`, this code resets
`isInitialized` to `false`. However, a concurrent transaction commit thread
could be between the `isInitialized.get()` check at line 159 and the
`generateTSO()` call at line 189. This is a TOCTOU race.
Moreover, `fatalClockBackwardReported` is also reset here. If the feature is
re-enabled and a clock backward is detected, the `TSOClockBackwardException` at
line 99 would be thrown again (which is `RuntimeException` subclass),
potentially crashing the daemon thread depending on how `MasterDaemon` handles
it.
Consider:
1. Not resetting `isInitialized` when disabling — just check
`Config.enable_tso_feature` at the `getTSO()` entry point
2. Or use a separate flag for "feature disabled" vs "not yet calibrated"
##########
fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java:
##########
@@ -102,13 +102,15 @@ public final class FeMetaVersion {
public static final int VERSION_139 = 139;
public static final int VERSION_140 = 140;
+ // For tso
+ public static final int VERSION_141 = 141;
// note: when increment meta version, should assign the latest version to
VERSION_CURRENT
- public static final int VERSION_CURRENT = VERSION_140;
+ public static final int VERSION_CURRENT = VERSION_141;
// all logs meta version should >= the minimum version, so that we could
remove many if clause, for example
// if (FE_METAVERSION < VERSION_94) ...
// these clause will be useless and we could remove them
- public static final int MINIMUM_VERSION_REQUIRED = VERSION_140;
+ public static final int MINIMUM_VERSION_REQUIRED = VERSION_141;
Review Comment:
**[Medium] `MINIMUM_VERSION_REQUIRED` should not be bumped to
`VERSION_141`.**
Bumping `MINIMUM_VERSION_REQUIRED` means FE instances running this code will
**refuse to load any meta image written by an FE with version < 141**. Since
the TSO image module is conditionally written (`enable_tso_checkpoint_module`
defaults to false), and the `"tso"` module in `PersistMetaModules` won't
produce data unless enabled, there's no reason to require version 141 as
minimum.
This bump prevents rollback to the previous FE version even when TSO is not
in use. Consider keeping `MINIMUM_VERSION_REQUIRED = VERSION_140` and only
bumping `VERSION_CURRENT`.
##########
fe/fe-core/src/main/java/org/apache/doris/tso/TSOService.java:
##########
@@ -0,0 +1,503 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tso;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.io.CountingDataOutputStream;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.journal.local.LocalJournal;
+import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.persist.EditLog;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class TSOService extends MasterDaemon {
+ private static final Logger LOG = LogManager.getLogger(TSOService.class);
+
+ // Global timestamp with physical time and logical counter
+ private final TSOTimestamp globalTimestamp = new TSOTimestamp();
+ // Lock for thread-safe access to global timestamp
+ private final ReentrantLock lock = new ReentrantLock();
+ // Guard value for time window updates (in milliseconds)
+ private static final long UPDATE_TIME_WINDOW_GUARD = 1;
+
+ private final AtomicBoolean isInitialized = new AtomicBoolean(false);
+ private final AtomicBoolean fatalClockBackwardReported = new
AtomicBoolean(false);
+ private final AtomicLong windowEndTSO = new AtomicLong(0);
+
+ private static final class TSOClockBackwardException extends
RuntimeException {
+ private TSOClockBackwardException(String message) {
+ super(message);
+ }
+ }
+
+ /**
+ * Constructor initializes the TSO service with update interval
+ */
+ public TSOService() {
+ super("TSO-service", Config.tso_service_update_interval_ms);
+ }
+
+ /**
+ * Start the TSO service.
+ */
+ @Override
+ public synchronized void start() {
+ super.start();
+ }
+
+ /**
+ * Periodically update timestamp after catalog is ready
+ * This method is called by the MasterDaemon framework
+ */
+ @Override
+ protected void runAfterCatalogReady() {
+ if (!Config.enable_tso_feature) {
+ isInitialized.set(false);
+ fatalClockBackwardReported.set(false);
+ return;
+ }
+ int maxUpdateRetryCount = Math.max(1,
Config.tso_max_update_retry_count);
+ boolean updated = false;
+ Throwable lastFailure = null;
+ if (!isInitialized.get()) {
+ for (int i = 0; i < maxUpdateRetryCount; i++) {
+ if (isInitialized.get()) {
+ break;
+ }
+ LOG.info("TSO service timestamp is not calibrated, start
calibrate timestamp");
+ try {
+ calibrateTimestamp();
+ } catch (TSOClockBackwardException e) {
+ lastFailure = e;
+ if (fatalClockBackwardReported.compareAndSet(false, true))
{
+ LOG.error("TSO service calibrate timestamp failed due
to clock backward beyond threshold", e);
+ throw e;
+ }
+ return;
+ } catch (Exception e) {
+ lastFailure = e;
+ LOG.warn("TSO service calibrate timestamp failed", e);
+ }
+ if (!isInitialized.get()) {
+ try {
+ sleep(Config.tso_service_update_interval_ms);
+ } catch (InterruptedException ie) {
+ LOG.warn("TSO service sleep interrupted", ie);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ if (!isInitialized.get()) {
+ return;
+ }
+ }
+
+ for (int i = 0; i < maxUpdateRetryCount; i++) {
+ try {
+ updateTimestamp();
+ updated = true;
+ break;
+ } catch (Exception e) {
+ lastFailure = e;
+ LOG.warn("TSO service update timestamp failed, retry: {}", i,
e);
+ if (MetricRepo.isInit) {
+ MetricRepo.COUNTER_TSO_CLOCK_UPDATE_FAILED.increase(1L);
+ }
+ try {
+ sleep(Config.tso_service_update_interval_ms);
+ } catch (InterruptedException ie) {
+ LOG.warn("TSO service sleep interrupted", ie);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ if (updated) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("TSO service updated timestamp");
+ }
+ } else if (lastFailure != null) {
+ LOG.warn("TSO service update timestamp failed after {} retries",
+ maxUpdateRetryCount, lastFailure);
+ } else {
+ LOG.warn("TSO service update timestamp failed after {} retries",
maxUpdateRetryCount);
+ }
+ }
+
+ /**
+ * Generate a single TSO timestamp
+ *
+ * @return Composed TSO timestamp combining physical time and logical
counter
+ * @throws RuntimeException if TSO is not calibrated or other errors occur
+ */
+ public long getTSO() {
+ if (!isInitialized.get()) {
+ throw new RuntimeException("TSO timestamp is not calibrated,
please check");
+ }
+ int maxGetTSORetryCount = Math.max(1, Config.tso_max_get_retry_count);
+ RuntimeException lastFailure = null;
+ for (int i = 0; i < maxGetTSORetryCount; i++) {
+ // Wait for environment to be ready and ensure we're running on
master FE
+ Env env = Env.getCurrentEnv();
+ if (env == null || !env.isReady()) {
+ LOG.warn("TSO service wait for catalog ready");
+ lastFailure = new RuntimeException("Env is null or not ready");
+ try {
+ sleep(200);
+ } catch (InterruptedException ie) {
+ LOG.warn("TSO service sleep interrupted", ie);
+ Thread.currentThread().interrupt();
+ }
+ continue;
+ } else if (!env.isMaster()) {
+ LOG.warn("TSO service only run on master FE");
+ lastFailure = new RuntimeException("Current FE is not master");
+ try {
+ sleep(200);
+ } catch (InterruptedException ie) {
+ LOG.warn("TSO service sleep interrupted", ie);
+ Thread.currentThread().interrupt();
+ }
+ continue;
+ }
+
+ Pair<Long, Long> pair = generateTSO();
+ long physical = pair.first;
+ long logical = pair.second;
+
+ if (physical == 0) {
+ throw new RuntimeException("TSO timestamp is not calibrated,
please check");
+ }
+
+ // Check for logical counter overflow
+ if (logical > TSOTimestamp.MAX_LOGICAL_COUNTER) {
+ LOG.warn("TSO timestamp logical counter overflow, please
check");
+ lastFailure = new RuntimeException("TSO timestamp logical
counter overflow");
+ try {
+ sleep(Config.tso_service_update_interval_ms);
+ } catch (InterruptedException ie) {
+ LOG.warn("TSO service sleep interrupted", ie);
+ Thread.currentThread().interrupt();
+ }
+ continue;
+ }
+ if (MetricRepo.isInit) {
+ MetricRepo.COUNTER_TSO_CLOCK_GET_SUCCESS.increase(1L);
+ }
+ return TSOTimestamp.composeTimestamp(physical, logical);
+ }
+ throw new RuntimeException("Failed to get TSO after " +
maxGetTSORetryCount + " retries", lastFailure);
+ }
+
+ /**
+ * Get the current composed TSO timestamp
+ *
+ * @return Current TSO timestamp combining physical time and logical
counter
+ */
+ public long getCurrentTSO() {
+ lock.lock();
+ try {
+ return globalTimestamp.composeTimestamp();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Calibrate the TSO timestamp when service starts
+ * This ensures the timestamp is consistent with the last persisted value
+ *
+ * Algorithm:
+ * - If Tnow - Tlast < 1ms, then Tnext = Tlast + 1
+ * - Otherwise Tnext = Tnow
+ */
+ private void calibrateTimestamp() {
+ if (isInitialized.get()) {
+ return;
+ }
+ // Check if Env is ready before calibration
+ Env env = Env.getCurrentEnv();
+ if (env == null || !env.isReady() || !env.isMaster()) {
+ LOG.warn("Env is not ready or not master, skip TSO timestamp
calibration");
+ return;
+ }
+
+ long timeLast = windowEndTSO.get(); // Last timestamp from
image/editlog replay
+ long timeNow = System.currentTimeMillis() +
Config.tso_time_offset_debug_mode;
+ long backwardMs = timeLast - timeNow;
+ if (backwardMs > Config.tso_clock_backward_startup_threshold_ms) {
+ throw new TSOClockBackwardException("TSO clock backward too much
during calibration, backwardMs="
+ + backwardMs + ", thresholdMs=" +
Config.tso_clock_backward_startup_threshold_ms
+ + ", lastWindowEndTSO=" + timeLast + ", currentMillis=" +
timeNow);
+ }
+
+ // Calculate next physical time to ensure monotonicity
+ long nextPhysicalTime;
+ if (timeNow - timeLast < 1) {
+ nextPhysicalTime = timeLast + 1;
+ } else {
+ nextPhysicalTime = timeNow;
+ }
+
+ // Construct new timestamp (physical time with reset logical counter)
+ setTSOPhysical(nextPhysicalTime, true);
+
+ // Write the right boundary of time window to BDBJE for persistence
+ long timeWindowEnd = nextPhysicalTime +
Config.tso_service_window_duration_ms;
+ windowEndTSO.set(timeWindowEnd);
+ writeTimestampToBDBJE(timeWindowEnd);
+ isInitialized.set(true);
+
+ LOG.info("TSO timestamp calibrated: lastTimestamp={},
currentMillis={}, nextPhysicalTime={}, timeWindowEnd={}",
+ timeLast, timeNow, nextPhysicalTime, timeWindowEnd);
+ if (MetricRepo.isInit) {
+ MetricRepo.COUNTER_TSO_CLOCK_CALCULATED.increase(1L);
+ }
+ }
+
+ /**
+ * Update timestamp periodically to maintain time window
+ * This method handles various time-related issues:
+ * 1. Clock drift detection
+ * 2. Clock backward detection
+ * 3. Logical counter overflow handling
+ * 4. Time window renewal
+ */
+ private void updateTimestamp() {
+ // Check if Env is ready
+ Env env = Env.getCurrentEnv();
+ if (env == null || !env.isReady() || !env.isMaster()) {
+ LOG.warn("Env is not ready or not master, skip TSO timestamp
update");
+ return;
+ }
+
+ // 1. Check if TSO has been calibrated
+ long currentTime = System.currentTimeMillis() +
Config.tso_time_offset_debug_mode;
+ long prevPhysicalTime = 0;
+ long prevLogicalCounter = 0;
+
+ lock.lock();
+ try {
+ prevPhysicalTime = globalTimestamp.getPhysicalTimestamp();
+ prevLogicalCounter = globalTimestamp.getLogicalCounter();
+ } finally {
+ lock.unlock();
+ }
+
+ if (prevPhysicalTime == 0) {
+ LOG.error("TSO timestamp is not calibrated, please check");
Review Comment:
**[Low] `updateTimestamp()` only logs error when `prevPhysicalTime == 0` but
continues execution.**
If `prevPhysicalTime == 0` (meaning TSO was never calibrated), the method
continues to compute `timeLag` using this 0 value, which would always trigger
the clock-drift warning at line 318. The method should `return` early here
since continuing with uncalibrated state produces meaningless warnings and
incorrect state transitions.
##########
fe/fe-core/src/main/java/org/apache/doris/tso/TSOService.java:
##########
@@ -0,0 +1,503 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tso;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.io.CountingDataOutputStream;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.journal.local.LocalJournal;
+import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.persist.EditLog;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class TSOService extends MasterDaemon {
+ private static final Logger LOG = LogManager.getLogger(TSOService.class);
+
+ // Global timestamp with physical time and logical counter
+ private final TSOTimestamp globalTimestamp = new TSOTimestamp();
+ // Lock for thread-safe access to global timestamp
+ private final ReentrantLock lock = new ReentrantLock();
+ // Guard value for time window updates (in milliseconds)
+ private static final long UPDATE_TIME_WINDOW_GUARD = 1;
+
+ private final AtomicBoolean isInitialized = new AtomicBoolean(false);
+ private final AtomicBoolean fatalClockBackwardReported = new
AtomicBoolean(false);
+ private final AtomicLong windowEndTSO = new AtomicLong(0);
+
+ private static final class TSOClockBackwardException extends
RuntimeException {
+ private TSOClockBackwardException(String message) {
+ super(message);
+ }
+ }
+
+ /**
+ * Constructor initializes the TSO service with update interval
+ */
+ public TSOService() {
+ super("TSO-service", Config.tso_service_update_interval_ms);
+ }
+
+ /**
+ * Start the TSO service.
+ */
+ @Override
+ public synchronized void start() {
+ super.start();
+ }
+
+ /**
+ * Periodically update timestamp after catalog is ready
+ * This method is called by the MasterDaemon framework
+ */
+ @Override
+ protected void runAfterCatalogReady() {
+ if (!Config.enable_tso_feature) {
+ isInitialized.set(false);
+ fatalClockBackwardReported.set(false);
+ return;
+ }
+ int maxUpdateRetryCount = Math.max(1,
Config.tso_max_update_retry_count);
+ boolean updated = false;
+ Throwable lastFailure = null;
+ if (!isInitialized.get()) {
+ for (int i = 0; i < maxUpdateRetryCount; i++) {
+ if (isInitialized.get()) {
+ break;
+ }
+ LOG.info("TSO service timestamp is not calibrated, start
calibrate timestamp");
+ try {
+ calibrateTimestamp();
+ } catch (TSOClockBackwardException e) {
+ lastFailure = e;
+ if (fatalClockBackwardReported.compareAndSet(false, true))
{
+ LOG.error("TSO service calibrate timestamp failed due
to clock backward beyond threshold", e);
+ throw e;
+ }
+ return;
+ } catch (Exception e) {
+ lastFailure = e;
+ LOG.warn("TSO service calibrate timestamp failed", e);
+ }
+ if (!isInitialized.get()) {
+ try {
+ sleep(Config.tso_service_update_interval_ms);
+ } catch (InterruptedException ie) {
+ LOG.warn("TSO service sleep interrupted", ie);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ if (!isInitialized.get()) {
+ return;
+ }
+ }
+
+ for (int i = 0; i < maxUpdateRetryCount; i++) {
+ try {
+ updateTimestamp();
+ updated = true;
+ break;
+ } catch (Exception e) {
+ lastFailure = e;
+ LOG.warn("TSO service update timestamp failed, retry: {}", i,
e);
+ if (MetricRepo.isInit) {
+ MetricRepo.COUNTER_TSO_CLOCK_UPDATE_FAILED.increase(1L);
+ }
+ try {
+ sleep(Config.tso_service_update_interval_ms);
+ } catch (InterruptedException ie) {
+ LOG.warn("TSO service sleep interrupted", ie);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ if (updated) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("TSO service updated timestamp");
+ }
+ } else if (lastFailure != null) {
+ LOG.warn("TSO service update timestamp failed after {} retries",
+ maxUpdateRetryCount, lastFailure);
+ } else {
+ LOG.warn("TSO service update timestamp failed after {} retries",
maxUpdateRetryCount);
+ }
+ }
+
+ /**
+ * Generate a single TSO timestamp
+ *
+ * @return Composed TSO timestamp combining physical time and logical
counter
+ * @throws RuntimeException if TSO is not calibrated or other errors occur
+ */
+ public long getTSO() {
+ if (!isInitialized.get()) {
+ throw new RuntimeException("TSO timestamp is not calibrated,
please check");
+ }
+ int maxGetTSORetryCount = Math.max(1, Config.tso_max_get_retry_count);
+ RuntimeException lastFailure = null;
+ for (int i = 0; i < maxGetTSORetryCount; i++) {
+ // Wait for environment to be ready and ensure we're running on
master FE
+ Env env = Env.getCurrentEnv();
+ if (env == null || !env.isReady()) {
+ LOG.warn("TSO service wait for catalog ready");
+ lastFailure = new RuntimeException("Env is null or not ready");
+ try {
+ sleep(200);
+ } catch (InterruptedException ie) {
+ LOG.warn("TSO service sleep interrupted", ie);
+ Thread.currentThread().interrupt();
+ }
+ continue;
+ } else if (!env.isMaster()) {
+ LOG.warn("TSO service only run on master FE");
+ lastFailure = new RuntimeException("Current FE is not master");
+ try {
+ sleep(200);
+ } catch (InterruptedException ie) {
+ LOG.warn("TSO service sleep interrupted", ie);
+ Thread.currentThread().interrupt();
+ }
+ continue;
+ }
+
+ Pair<Long, Long> pair = generateTSO();
+ long physical = pair.first;
+ long logical = pair.second;
+
+ if (physical == 0) {
+ throw new RuntimeException("TSO timestamp is not calibrated,
please check");
+ }
+
+ // Check for logical counter overflow
+ if (logical > TSOTimestamp.MAX_LOGICAL_COUNTER) {
+ LOG.warn("TSO timestamp logical counter overflow, please
check");
+ lastFailure = new RuntimeException("TSO timestamp logical
counter overflow");
+ try {
+ sleep(Config.tso_service_update_interval_ms);
+ } catch (InterruptedException ie) {
+ LOG.warn("TSO service sleep interrupted", ie);
+ Thread.currentThread().interrupt();
+ }
+ continue;
+ }
+ if (MetricRepo.isInit) {
+ MetricRepo.COUNTER_TSO_CLOCK_GET_SUCCESS.increase(1L);
+ }
+ return TSOTimestamp.composeTimestamp(physical, logical);
+ }
+ throw new RuntimeException("Failed to get TSO after " +
maxGetTSORetryCount + " retries", lastFailure);
+ }
+
+ /**
+ * Get the current composed TSO timestamp
+ *
+ * @return Current TSO timestamp combining physical time and logical
counter
+ */
+ public long getCurrentTSO() {
+ lock.lock();
+ try {
+ return globalTimestamp.composeTimestamp();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Calibrate the TSO timestamp when service starts
+ * This ensures the timestamp is consistent with the last persisted value
+ *
+ * Algorithm:
+ * - If Tnow - Tlast < 1ms, then Tnext = Tlast + 1
+ * - Otherwise Tnext = Tnow
+ */
+ private void calibrateTimestamp() {
+ if (isInitialized.get()) {
+ return;
+ }
+ // Check if Env is ready before calibration
+ Env env = Env.getCurrentEnv();
+ if (env == null || !env.isReady() || !env.isMaster()) {
+ LOG.warn("Env is not ready or not master, skip TSO timestamp
calibration");
+ return;
+ }
+
+ long timeLast = windowEndTSO.get(); // Last timestamp from
image/editlog replay
+ long timeNow = System.currentTimeMillis() +
Config.tso_time_offset_debug_mode;
+ long backwardMs = timeLast - timeNow;
+ if (backwardMs > Config.tso_clock_backward_startup_threshold_ms) {
+ throw new TSOClockBackwardException("TSO clock backward too much
during calibration, backwardMs="
+ + backwardMs + ", thresholdMs=" +
Config.tso_clock_backward_startup_threshold_ms
+ + ", lastWindowEndTSO=" + timeLast + ", currentMillis=" +
timeNow);
+ }
+
+ // Calculate next physical time to ensure monotonicity
+ long nextPhysicalTime;
+ if (timeNow - timeLast < 1) {
+ nextPhysicalTime = timeLast + 1;
+ } else {
+ nextPhysicalTime = timeNow;
+ }
+
+ // Construct new timestamp (physical time with reset logical counter)
+ setTSOPhysical(nextPhysicalTime, true);
+
+ // Write the right boundary of time window to BDBJE for persistence
+ long timeWindowEnd = nextPhysicalTime +
Config.tso_service_window_duration_ms;
+ windowEndTSO.set(timeWindowEnd);
+ writeTimestampToBDBJE(timeWindowEnd);
+ isInitialized.set(true);
+
+ LOG.info("TSO timestamp calibrated: lastTimestamp={},
currentMillis={}, nextPhysicalTime={}, timeWindowEnd={}",
+ timeLast, timeNow, nextPhysicalTime, timeWindowEnd);
+ if (MetricRepo.isInit) {
+ MetricRepo.COUNTER_TSO_CLOCK_CALCULATED.increase(1L);
+ }
+ }
+
+ /**
+ * Update timestamp periodically to maintain time window
+ * This method handles various time-related issues:
+ * 1. Clock drift detection
+ * 2. Clock backward detection
+ * 3. Logical counter overflow handling
+ * 4. Time window renewal
+ */
+ private void updateTimestamp() {
+ // Check if Env is ready
+ Env env = Env.getCurrentEnv();
+ if (env == null || !env.isReady() || !env.isMaster()) {
+ LOG.warn("Env is not ready or not master, skip TSO timestamp
update");
+ return;
+ }
+
+ // 1. Check if TSO has been calibrated
+ long currentTime = System.currentTimeMillis() +
Config.tso_time_offset_debug_mode;
+ long prevPhysicalTime = 0;
+ long prevLogicalCounter = 0;
+
+ lock.lock();
+ try {
+ prevPhysicalTime = globalTimestamp.getPhysicalTimestamp();
+ prevLogicalCounter = globalTimestamp.getLogicalCounter();
+ } finally {
+ lock.unlock();
+ }
+
+ if (prevPhysicalTime == 0) {
+ LOG.error("TSO timestamp is not calibrated, please check");
+ }
+
+ // 2. Check for serious clock issues
+ long timeLag = currentTime - prevPhysicalTime;
+ if (timeLag >= 3 * Config.tso_service_update_interval_ms) {
+ // Clock drift (time difference too large), log clearly and
trigger corresponding metric
+ LOG.warn("TSO clock drift detected, lastPhysicalTime={},
currentTime={}, "
+ + "timeLag={} (exceeds 3 * update interval {})",
+ prevPhysicalTime, currentTime, timeLag, 3 *
Config.tso_service_update_interval_ms);
+ if (MetricRepo.isInit) {
+ MetricRepo.COUNTER_TSO_CLOCK_DRIFT_DETECTED.increase(1L);
+ }
+ } else if (timeLag < 0) {
+ // Clock backward (current time earlier than last recorded time)
+ // log clearly and trigger corresponding metric
+ LOG.warn("TSO clock backward detected, lastPhysicalTime={},
currentTime={}, "
+ + "timeLag={} (current time is earlier than last
physical time)",
+ prevPhysicalTime, currentTime, timeLag);
+ if (MetricRepo.isInit) {
+ MetricRepo.COUNTER_TSO_CLOCK_BACKWARD_DETECTED.increase(1L);
+ }
+ }
+
+ // 3. Update time based on conditions
+ long nextPhysicalTime = prevPhysicalTime;
+ if (timeLag > UPDATE_TIME_WINDOW_GUARD) {
+ // Align physical time to current time
+ nextPhysicalTime = currentTime;
+ } else if (prevLogicalCounter > TSOTimestamp.MAX_LOGICAL_COUNTER / 2) {
+ // Logical counter nearly full → advance to next millisecond
+ nextPhysicalTime = prevPhysicalTime + 1;
+ } else {
+ // Logical counter not nearly full → just increment logical counter
+ // do nothing
+ }
+
+ // 4. Check if time window right boundary needs renewal
+ if ((windowEndTSO.get() - nextPhysicalTime) <=
UPDATE_TIME_WINDOW_GUARD) {
+ // Time window right boundary needs renewal
+ long nextWindowEnd = nextPhysicalTime +
Config.tso_service_window_duration_ms;
+ windowEndTSO.set(nextWindowEnd);
+ writeTimestampToBDBJE(nextWindowEnd);
+ }
+
+ // 5. Update global timestamp
+ setTSOPhysical(nextPhysicalTime, false);
+ if (MetricRepo.isInit) {
+ MetricRepo.COUNTER_TSO_CLOCK_UPDATED.increase(1L);
+ }
+ }
+
+ /**
+ * Write the right boundary of TSO time window to BDBJE for persistence
+ *
+ * @param timestamp The timestamp to write
+ */
+ private void writeTimestampToBDBJE(long timestamp) {
+ try {
+ // Check if Env is ready
+ Env env = Env.getCurrentEnv();
+ if (env == null) {
+ LOG.warn("Env is null, skip writing TSO timestamp to BDBJE");
+ return;
+ }
+
+ // Check if Env is ready and is master
+ if (!env.isReady()) {
+ LOG.warn("Env is not ready, skip writing TSO timestamp to
BDBJE");
+ return;
+ }
+
+ if (!env.isMaster()) {
+ LOG.warn("Current node is not master, skip writing TSO
timestamp to BDBJE");
+ return;
+ }
+
+ TSOTimestamp tsoTimestamp = new TSOTimestamp(timestamp, 0);
+
+ // Check if EditLog is available
+ EditLog editLog = env.getEditLog();
+ if (editLog == null) {
+ LOG.warn("EditLog is null, skip writing TSO timestamp to
BDBJE");
+ return;
+ }
+
+ // Additional check to ensure EditLog's journal is properly
initialized
+ if (editLog.getJournal() == null) {
+ LOG.warn("EditLog's journal is null, skip writing TSO
timestamp to BDBJE");
+ return;
+ }
+
+ if (editLog.getJournal() instanceof LocalJournal) {
+ if (!((LocalJournal) editLog.getJournal()).isReadyToFlush()) {
+ LOG.warn("EditLog's journal is not ready to flush, skip
writing TSO timestamp to BDBJE");
+ return;
+ }
+ }
+
+ if (Config.enable_tso_persist_journal) {
+ editLog.logTSOTimestampWindowEnd(tsoTimestamp);
+ } else {
+ LOG.debug("TSO timestamp {} is not persisted to journal, "
+ + "please check if enable_tso_persist_journal is set
to true",
+ tsoTimestamp);
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to write TSO timestamp to BDBJE", e);
Review Comment:
**[Low] `writeTimestampToBDBJE` catches all exceptions and only logs.**
Silently swallowing persistence failures means `isInitialized` can be set to
`true` (in `calibrateTimestamp()` at line 274) even when the window end was
never persisted. On FE restart, `windowEndTSO` would be 0, and TSO monotonicity
could be violated.
Consider either:
- Propagating the exception (at least from `calibrateTimestamp()`, where
persistence failure should prevent initialization)
- Or returning a boolean success/failure that the caller can check
##########
fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java:
##########
@@ -44,7 +44,7 @@ public class PersistMetaModules {
"globalFunction", "workloadGroups",
"binlogs", "resourceGroups", "AnalysisMgrV2", "AsyncJobManager",
"workloadSchedPolicy",
"insertOverwrite", "plsql", "dictionaryManager", "indexPolicy",
"KeyManagerStore",
- "constraintManager"
+ "constraintManager", "tso"
Review Comment:
**[Info] The `"tso"` module is unconditionally registered in
`MODULE_NAMES`.**
While `saveTSO()` checks `Config.enable_tso_feature &&
Config.enable_tso_checkpoint_module` before writing data, the module is always
present. This means:
- Images written with TSO data could fail to load on older FEs that don't
know about the `"tso"` module (unless `ignore_unknown_metadata_module=true`)
- The module will be iterated during every image save/load even when TSO is
disabled
This is acceptable given the `ignore_unknown_metadata_module` safety valve
and the experimental nature, but worth noting for future production readiness.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]