This is an automated email from the ASF dual-hosted git repository.
hellostephen pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new d5c43c51b5a [branch-2.1](fe) Avoid interrupt daemon thread and use
proper polling interval (#42210) (#42646)
d5c43c51b5a is described below
commit d5c43c51b5a50efd8f5ea14cff540fdd1383da72
Author: zclllhhjj <[email protected]>
AuthorDate: Tue Oct 29 10:10:26 2024 +0800
[branch-2.1](fe) Avoid interrupt daemon thread and use proper polling
interval (#42210) (#42646)
pick https://github.com/apache/doris/pull/42210
---
.../main/java/org/apache/doris/catalog/Env.java | 3 +-
.../doris/clone/DynamicPartitionScheduler.java | 45 ++++++++++++++++++++++
.../java/org/apache/doris/common/util/Daemon.java | 11 ++++--
3 files changed, 54 insertions(+), 5 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index f60f4634c0b..0d9ad091fcb 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -5579,8 +5579,9 @@ public class Env {
ConfigBase.setMutableConfig(key, value);
if (configtoThreads.get(key) != null) {
try {
+ // not atomic. maybe delay to aware. but acceptable.
configtoThreads.get(key).get().setInterval(Config.getField(key).getLong(null) *
1000L);
- configtoThreads.get(key).get().interrupt();
+ // shouldn't interrupt to keep possible bdbje writing safe.
LOG.info("set config " + key + " to " + value);
} catch (IllegalAccessException e) {
LOG.warn("set config " + key + " failed: " + e.getMessage());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
index 1b00f041964..51dc7dd802f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
@@ -50,6 +50,7 @@ import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.RangeUtils;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.meta.MetaContext;
import org.apache.doris.thrift.TStorageMedium;
import com.google.common.base.Strings;
@@ -88,6 +89,8 @@ public class DynamicPartitionScheduler extends MasterDaemon {
private static final String DEFAULT_RUNTIME_VALUE =
FeConstants.null_string;
+ private static final long SLEEP_PIECE = 5000L;
+
private Map<Long, Map<String, String>> runtimeInfos =
Maps.newConcurrentMap();
private Set<Pair<Long, Long>> dynamicPartitionTableInfo =
Sets.newConcurrentHashSet();
private boolean initialize;
@@ -663,6 +666,48 @@ public class DynamicPartitionScheduler extends
MasterDaemon {
initialize = true;
}
+ // specialized schedule logic. split sleep to many small pieces. so if
interval changed, it won't take too much
+ // time to aware.
+ @Override
+ public void run() {
+ if (metaContext != null) {
+ metaContext.setThreadLocalInfo();
+ }
+
+ while (!isStop.get()) {
+ try {
+ runOneCycle();
+ } catch (Throwable e) {
+ LOG.error("daemon thread got exception. name: {}", getName(),
e);
+ }
+
+ try {
+ long oldInterval = intervalMs;
+ long remainingInterval = oldInterval;
+ while (remainingInterval > SLEEP_PIECE) {
+ // if it changed. let it know at most 10 seconds. and 5
second per wakeup is acceptable.
+ if (intervalMs != oldInterval) { // changed
+ break;
+ }
+
+ Thread.sleep(SLEEP_PIECE);
+ remainingInterval -= SLEEP_PIECE;
+ }
+ if (remainingInterval <= SLEEP_PIECE) {
+ Thread.sleep(remainingInterval);
+ }
+ } catch (InterruptedException e) {
+ // This thread should NEVER be interrupted. or meet bdbje
writing, it will be disaster.
+ LOG.fatal("InterruptedException: ", e);
+ }
+ }
+
+ if (metaContext != null) {
+ MetaContext.remove();
+ }
+ LOG.error("daemon thread exits. name=" + this.getName());
+ }
+
@Override
protected void runAfterCatalogReady() {
if (!initialize) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Daemon.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/Daemon.java
index 472285b4764..4678f78d668 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Daemon.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Daemon.java
@@ -28,12 +28,15 @@ public class Daemon extends Thread {
private static final Logger LOG = LogManager.getLogger(Daemon.class);
private static final int DEFAULT_INTERVAL_SECONDS = 30; // 30 seconds
- private long intervalMs;
- private AtomicBoolean isStop;
+ protected long intervalMs;
+
+ protected AtomicBoolean isStop;
+
+ protected MetaContext metaContext = null;
+
private Runnable runnable;
- private AtomicBoolean isStart = new AtomicBoolean(false);
- private MetaContext metaContext = null;
+ private AtomicBoolean isStart = new AtomicBoolean(false);
{
setDaemon(true);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]