This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 5ddabea2a93 [refactor](stats) refactor collection logic and opt some 
config #26163 (#26858)
5ddabea2a93 is described below

commit 5ddabea2a936c3f37224872edb6b3216d70fa891
Author: AKIRA <[email protected]>
AuthorDate: Mon Nov 13 18:27:07 2023 +0900

    [refactor](stats) refactor collection logic and opt some config #26163 
(#26858)
    
    picked from #26163
---
 .../main/java/org/apache/doris/common/Config.java  |  34 +--
 .../java/org/apache/doris/catalog/OlapTable.java   |   7 +-
 .../doris/nereids/stats/StatsCalculator.java       |   5 +
 .../java/org/apache/doris/qe/SessionVariable.java  |  70 ++++-
 .../org/apache/doris/statistics/AnalysisJob.java   | 193 ++++++++++++++
 .../apache/doris/statistics/AnalysisManager.java   |  90 ++-----
 .../doris/statistics/AnalysisTaskExecutor.java     |  28 +-
 .../doris/statistics/AnalysisTaskWrapper.java      |  16 +-
 .../apache/doris/statistics/BaseAnalysisTask.java  | 110 +++-----
 .../org/apache/doris/statistics/ColStatsData.java  |  14 +
 .../apache/doris/statistics/HMSAnalysisTask.java   | 135 +---------
 .../apache/doris/statistics/JdbcAnalysisTask.java  |  34 +--
 .../apache/doris/statistics/MVAnalysisTask.java    | 152 -----------
 .../apache/doris/statistics/OlapAnalysisTask.java  | 138 +---------
 .../doris/statistics/StatisticConstants.java       |  14 +-
 .../doris/statistics/StatisticsAutoCollector.java  |   9 +-
 .../doris/statistics/StatisticsCollector.java      |  11 +-
 .../statistics/StatisticsPeriodCollector.java      |  50 ----
 .../java/org/apache/doris/statistics/StatsId.java  |  15 +-
 .../doris/statistics/util/StatisticsUtil.java      |  81 +++++-
 .../apache/doris/statistics/AnalysisJobTest.java   | 233 ++++++++++-------
 .../doris/statistics/AnalysisManagerTest.java      |  37 ++-
 .../doris/statistics/AnalysisTaskExecutorTest.java |  16 +-
 .../{AnalysisJobTest.java => AnalyzeTest.java}     |   8 +-
 .../org/apache/doris/statistics/CacheTest.java     |  32 +++
 .../doris/statistics/OlapAnalysisTaskTest.java     |  74 ++++--
 .../statistics/StatisticsAutoCollectorTest.java    | 289 +++++++++++++++++++++
 .../doris/statistics/util/StatisticsUtilTest.java  |  46 +++-
 .../suites/statistics/analyze_stats.groovy         | 148 ++++++++---
 .../suites/statistics/test_agg_complex_type.groovy |  53 ++++
 30 files changed, 1265 insertions(+), 877 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index a361cd7291a..142616f2a0a 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -19,8 +19,6 @@ package org.apache.doris.common;
 
 import org.apache.doris.common.ExperimentalUtil.ExperimentalType;
 
-import java.util.concurrent.TimeUnit;
-
 public class Config extends ConfigBase {
 
     @ConfField(description = {"用户自定义配置文件的路径,用于存放 fe_custom.conf。该文件中的配置会覆盖 
fe.conf 中的配置",
@@ -1745,7 +1743,7 @@ public class Config extends ConfigBase {
      * Used to determined how many statistics collection SQL could run 
simultaneously.
      */
     @ConfField
-    public static int statistics_simultaneously_running_task_num = 10;
+    public static int statistics_simultaneously_running_task_num = 3;
 
     /**
      * if table has too many replicas, Fe occur oom when schema change.
@@ -2046,7 +2044,7 @@ public class Config extends ConfigBase {
      * FE OOM.
      */
     @ConfField
-    public static long stats_cache_size = 10_0000;
+    public static long stats_cache_size = 50_0000;
 
     /**
      * This configuration is used to enable the statistics of query 
information, which will record
@@ -2069,9 +2067,6 @@ public class Config extends ConfigBase {
             "Whether to enable binlog feature"})
     public static boolean enable_feature_binlog = false;
 
-    @ConfField
-    public static int analyze_task_timeout_in_hours = 12;
-
     @ConfField(mutable = true, masterOnly = true, description = {
             "是否禁止使用 WITH REOSOURCE 语句创建 Catalog。",
             "Whether to disable creating catalog with WITH RESOURCE 
statement."})
@@ -2126,9 +2121,6 @@ public class Config extends ConfigBase {
     @ConfField
     public static boolean forbid_running_alter_job = false;
 
-    @ConfField
-    public static int table_stats_health_threshold = 80;
-
     @ConfField(description = {
             "暂时性配置项,开启后会自动将所有的olap表修改为可light schema change",
             "temporary config filed, will make all olap tables enable light 
schema change"
@@ -2154,28 +2146,6 @@ public class Config extends ConfigBase {
                     + "but it will increase the memory overhead."})
     public static int virtual_node_number = 2048;
 
-    @ConfField(description = {"控制对大表的自动ANALYZE的最小时间间隔,"
-            + "在该时间间隔内大小超过huge_table_lower_bound_size_in_bytes的表仅ANALYZE一次",
-            "This controls the minimum time interval for automatic ANALYZE on 
large tables. Within this interval,"
-                    + "tables larger than huge_table_lower_bound_size_in_bytes 
are analyzed only once."})
-    public static long huge_table_auto_analyze_interval_in_millis = 
TimeUnit.HOURS.toMillis(12);
-
-    @ConfField(description = {"定义大表的大小下界,在开启enable_auto_sample的情况下,"
-            + "大小超过该值的表将会自动通过采样收集统计信息", "This defines the lower size bound for 
large tables. "
-            + "When enable_auto_sample is enabled, tables larger than this 
value will automatically collect "
-            + "statistics through sampling"})
-    public static long huge_table_lower_bound_size_in_bytes = 5L * 1024 * 1024 
* 1024;
-
-    @ConfField(description = {"定义开启开启大表自动sample后,对大表的采样比例",
-            "This defines the number of sample percent for large tables when 
automatic sampling for"
-                    + "large tables is enabled"})
-    public static int huge_table_default_sample_rows = 4194304;
-
-    @ConfField(description = 
{"是否开启大表自动sample,开启后对于大小超过huge_table_lower_bound_size_in_bytes会自动通过采样收集"
-            + "统计信息", "Whether to enable automatic sampling for large tables, 
which, when enabled, automatically"
-            + "collects statistics through sampling for tables larger than 
'huge_table_lower_bound_size_in_bytes'"})
-    public static boolean enable_auto_sample = false;
-
     @ConfField(description = {
             "控制统计信息的自动触发作业执行记录的持久化行数",
             "Determine the persist number of automatic triggered analyze job 
execution status"
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 0f6ffc3cf6b..576fda217e6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -53,7 +53,6 @@ import org.apache.doris.statistics.AnalysisInfo;
 import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
 import org.apache.doris.statistics.BaseAnalysisTask;
 import org.apache.doris.statistics.HistogramTask;
-import org.apache.doris.statistics.MVAnalysisTask;
 import org.apache.doris.statistics.OlapAnalysisTask;
 import org.apache.doris.statistics.TableStatsMeta;
 import org.apache.doris.statistics.util.StatisticsUtil;
@@ -1102,11 +1101,9 @@ public class OlapTable extends Table {
     public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
         if (info.analysisType.equals(AnalysisType.HISTOGRAM)) {
             return new HistogramTask(info);
-        }
-        if (info.analysisType.equals(AnalysisType.FUNDAMENTALS)) {
+        } else {
             return new OlapAnalysisTask(info);
         }
-        return new MVAnalysisTask(info);
     }
 
     public boolean needReAnalyzeTable(TableStatsMeta tblStats) {
@@ -1126,7 +1123,7 @@ public class OlapTable extends Table {
         }
         long updateRows = tblStats.updatedRows.get();
         int tblHealth = StatisticsUtil.getTableHealth(rowCount, updateRows);
-        return tblHealth < Config.table_stats_health_threshold;
+        return tblHealth < StatisticsUtil.getTableStatsHealthThreshold();
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
index bf45e128d8c..4f626948407 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
@@ -571,10 +571,15 @@ public class StatsCalculator extends 
DefaultPlanVisitor<Statistics, Void> {
     }
 
     private ColumnStatistic getColumnStatistic(TableIf table, String colName) {
+        ConnectContext connectContext = ConnectContext.get();
+        if (connectContext != null && 
connectContext.getSessionVariable().internalSession) {
+            return ColumnStatistic.UNKNOWN;
+        }
         if (totalColumnStatisticMap.get(table.getName() + colName) != null) {
             return totalColumnStatisticMap.get(table.getName() + colName);
         } else if (isPlayNereidsDump) {
             return ColumnStatistic.UNKNOWN;
+
         } else {
             long catalogId;
             long dbId;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index fb02d9cc7cd..7f165a8ec13 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -57,6 +57,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 /**
  * System variable.
@@ -412,6 +413,19 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String FASTER_FLOAT_CONVERT = "faster_float_convert";
 
+    public static final String ENABLE_DECIMAL256 = "enable_decimal256";
+
+    public static final String STATS_INSERT_MERGE_ITEM_COUNT = 
"stats_insert_merge_item_count";
+
+    public static final String HUGE_TABLE_DEFAULT_SAMPLE_ROWS = 
"huge_table_default_sample_rows";
+    public static final String HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES = 
"huge_table_lower_bound_size_in_bytes";
+
+    public static final String HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS
+            = "huge_table_auto_analyze_interval_in_millis";
+
+    public static final String TABLE_STATS_HEALTH_THRESHOLD
+            = "table_stats_health_threshold";
+
     public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
             SKIP_DELETE_PREDICATE,
             SKIP_DELETE_BITMAP,
@@ -465,7 +479,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public int queryTimeoutS = 900;
 
     // query timeout in second.
-    @VariableMgr.VarAttr(name = ANALYZE_TIMEOUT, needForward = true)
+    @VariableMgr.VarAttr(name = ANALYZE_TIMEOUT, flag = VariableMgr.GLOBAL, 
needForward = true)
     public int analyzeTimeoutS = 43200;
 
     // The global max_execution_time value provides the default for the 
session value for new connections.
@@ -1156,6 +1170,12 @@ public class SessionVariable implements Serializable, 
Writable {
                     + " use a skiplist to optimize the intersection."})
     public int invertedIndexConjunctionOptThreshold = 1000;
 
+    @VariableMgr.VarAttr(name = FULL_AUTO_ANALYZE_END_TIME, needForward = 
true, checker = "checkAnalyzeTimeFormat",
+            description = {"该参数定义自动ANALYZE例程的结束时间",
+                    "This parameter defines the end time for the automatic 
ANALYZE routine."},
+            flag = VariableMgr.GLOBAL)
+    public String fullAutoAnalyzeEndTime = "23:59:59";
+
     @VariableMgr.VarAttr(name = ENABLE_UNIQUE_KEY_PARTIAL_UPDATE, needForward 
= true)
     public boolean enableUniqueKeyPartialUpdate = false;
 
@@ -1177,12 +1197,6 @@ public class SessionVariable implements Serializable, 
Writable {
             flag = VariableMgr.GLOBAL)
     public String fullAutoAnalyzeStartTime = "00:00:00";
 
-    @VariableMgr.VarAttr(name = FULL_AUTO_ANALYZE_END_TIME, needForward = 
true, checker = "checkAnalyzeTimeFormat",
-            description = {"该参数定义自动ANALYZE例程的结束时间",
-                    "This parameter defines the end time for the automatic 
ANALYZE routine."},
-            flag = VariableMgr.GLOBAL)
-    public String fullAutoAnalyzeEndTime = "02:00:00";
-
     @VariableMgr.VarAttr(name = FASTER_FLOAT_CONVERT,
             description = {"是否启用更快的浮点数转换算法,注意会影响输出格式", "Set true to enable 
faster float pointer number convert"})
     public boolean fasterFloatConvert = false;
@@ -1192,6 +1206,48 @@ public class SessionVariable implements Serializable, 
Writable {
                     "the runtime filter id in IGNORE_RUNTIME_FILTER_IDS list 
will not be generated"})
 
     public String ignoreRuntimeFilterIds = "";
+
+    @VariableMgr.VarAttr(name = STATS_INSERT_MERGE_ITEM_COUNT, flag = 
VariableMgr.GLOBAL, description = {
+            "控制统计信息相关INSERT攒批数量", "Controls the batch size for stats INSERT 
merging."
+    }
+    )
+    public int statsInsertMergeItemCount = 200;
+
+    @VariableMgr.VarAttr(name = HUGE_TABLE_DEFAULT_SAMPLE_ROWS, flag = 
VariableMgr.GLOBAL, description = {
+            "定义开启开启大表自动sample后,对大表的采样比例",
+            "This defines the number of sample percent for large tables when 
automatic sampling for"
+                    + "large tables is enabled"
+
+    })
+    public long hugeTableDefaultSampleRows = 4194304;
+
+
+    @VariableMgr.VarAttr(name = HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES, flag = 
VariableMgr.GLOBAL,
+            description = {
+                    "大小超过该值的表将会自动通过采样收集统计信息",
+                    "This defines the lower size bound for large tables. "
+                            + "When enable_auto_sample is enabled, tables"
+                            + "larger than this value will automatically 
collect "
+                            + "statistics through sampling"})
+    public long hugeTableLowerBoundSizeInBytes = 5L * 1024 * 1024 * 1024;
+
+    @VariableMgr.VarAttr(name = HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS, 
flag = VariableMgr.GLOBAL,
+            description = {"控制对大表的自动ANALYZE的最小时间间隔,"
+                    + 
"在该时间间隔内大小超过huge_table_lower_bound_size_in_bytes的表仅ANALYZE一次",
+                    "This controls the minimum time interval for automatic 
ANALYZE on large tables."
+                            + "Within this interval,"
+                            + "tables larger than 
huge_table_lower_bound_size_in_bytes are analyzed only once."})
+    public long hugeTableAutoAnalyzeIntervalInMillis = 
TimeUnit.HOURS.toMillis(12);
+
+    @VariableMgr.VarAttr(name = TABLE_STATS_HEALTH_THRESHOLD, flag = 
VariableMgr.GLOBAL,
+            description = {"取值在0-100之间,当自上次统计信息收集操作之后"
+                    + "数据更新量达到 (100 - table_stats_health_threshold)% 
,认为该表的统计信息已过时",
+                    "The value should be between 0 and 100. When the data 
update quantity "
+                            + "exceeds (100 - table_stats_health_threshold)% 
since the last "
+                            + "statistics collection operation, the statistics 
for this table are"
+                            + "considered outdated."})
+    public int tableStatsHealthThreshold = 60;
+
     public static final String IGNORE_RUNTIME_FILTER_IDS = 
"ignore_runtime_filter_ids";
 
     public Set<Integer> getIgnoredRuntimeFilterIds() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java
new file mode 100644
index 00000000000..904dc21e337
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.qe.AuditLogHelper;
+import org.apache.doris.qe.AutoCloseConnectContext;
+import org.apache.doris.qe.QueryState;
+import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.StringJoiner;
+
+public class AnalysisJob {
+
+    public static final Logger LOG = LogManager.getLogger(AnalysisJob.class);
+
+    protected Set<BaseAnalysisTask> queryingTask;
+
+    protected Set<BaseAnalysisTask> queryFinished;
+
+    protected List<ColStatsData> buf;
+
+    protected int totalTaskCount;
+
+    protected int queryFinishedTaskCount;
+
+    protected StmtExecutor stmtExecutor;
+
+    protected boolean killed;
+
+    protected long start;
+
+    protected AnalysisInfo jobInfo;
+
+    protected AnalysisManager analysisManager;
+
+    public AnalysisJob(AnalysisInfo jobInfo, Collection<? extends 
BaseAnalysisTask> queryingTask) {
+        for (BaseAnalysisTask task : queryingTask) {
+            task.job = this;
+        }
+        this.queryingTask = new HashSet<>(queryingTask);
+        this.queryFinished = new HashSet<>();
+        this.buf = new ArrayList<>();
+        totalTaskCount = queryingTask.size();
+        start = System.currentTimeMillis();
+        this.jobInfo = jobInfo;
+        this.analysisManager = Env.getCurrentEnv().getAnalysisManager();
+    }
+
+    public synchronized void appendBuf(BaseAnalysisTask task, 
List<ColStatsData> statsData) {
+        queryingTask.remove(task);
+        buf.addAll(statsData);
+        queryFinished.add(task);
+        queryFinishedTaskCount += 1;
+        if (queryFinishedTaskCount == totalTaskCount) {
+            writeBuf();
+            updateTaskState(AnalysisState.FINISHED, "Cost time in sec: "
+                    + (System.currentTimeMillis() - start) / 1000);
+            deregisterJob();
+        } else if (buf.size() >= StatisticsUtil.getInsertMergeCount()) {
+            writeBuf();
+        }
+    }
+
+    // CHECKSTYLE OFF
+    // fallthrough here is expected
+    public void updateTaskState(AnalysisState state, String msg) {
+        long time = System.currentTimeMillis();
+        switch (state) {
+            case FAILED:
+                for (BaseAnalysisTask task : queryingTask) {
+                    analysisManager.updateTaskStatus(task.info, state, msg, 
time);
+                    task.cancel();
+                }
+                killed = true;
+            case FINISHED:
+                for (BaseAnalysisTask task : queryFinished) {
+                    analysisManager.updateTaskStatus(task.info, state, msg, 
time);
+                }
+            default:
+                // DO NOTHING
+        }
+    }
+
+    protected void writeBuf() {
+        if (killed) {
+            return;
+        }
+        // buf could be empty when nothing need to do, for example user submit 
an analysis task for table with no data
+        // change
+        if (!buf.isEmpty())  {
+            String insertStmt = "INSERT INTO " + 
StatisticConstants.FULL_QUALIFIED_STATS_TBL_NAME + " VALUES ";
+            StringJoiner values = new StringJoiner(",");
+            for (ColStatsData data : buf) {
+                values.add(data.toSQL(true));
+            }
+            insertStmt += values.toString();
+            int retryTimes = 0;
+            while (retryTimes < StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
+                if (killed) {
+                    return;
+                }
+                try (AutoCloseConnectContext r = 
StatisticsUtil.buildConnectContext(false)) {
+                    stmtExecutor = new StmtExecutor(r.connectContext, 
insertStmt);
+                    executeWithExceptionOnFail(stmtExecutor);
+                    break;
+                } catch (Exception t) {
+                    LOG.warn("Failed to write buf: " + insertStmt, t);
+                    retryTimes++;
+                    if (retryTimes >= 
StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
+                        updateTaskState(AnalysisState.FAILED, t.getMessage());
+                        return;
+                    }
+                }
+            }
+        }
+        updateTaskState(AnalysisState.FINISHED, "");
+        syncLoadStats();
+        queryFinished.clear();
+    }
+
+    protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor) 
throws Exception {
+        if (killed) {
+            return;
+        }
+        LOG.debug("execute internal sql: {}", stmtExecutor.getOriginStmt());
+        try {
+            stmtExecutor.execute();
+            QueryState queryState = stmtExecutor.getContext().getState();
+            if (queryState.getStateType().equals(MysqlStateType.ERR)) {
+                throw new RuntimeException(
+                        "Failed to insert : " + 
stmtExecutor.getOriginStmt().originStmt + "Error msg: "
+                                + queryState.getErrorMessage());
+            }
+        } finally {
+            AuditLogHelper.logAuditLog(stmtExecutor.getContext(), 
stmtExecutor.getOriginStmt().toString(),
+                    stmtExecutor.getParsedStmt(), 
stmtExecutor.getQueryStatisticsForAuditLog(),
+                    true);
+        }
+    }
+
+    public void taskFailed(BaseAnalysisTask task, String reason) {
+        updateTaskState(AnalysisState.FAILED, reason);
+        cancel();
+        deregisterJob();
+    }
+
+    public void cancel() {
+        for (BaseAnalysisTask task : queryingTask) {
+            task.cancel();
+        }
+    }
+
+    public void deregisterJob() {
+        analysisManager.removeJob(jobInfo.jobId);
+    }
+
+    protected void syncLoadStats() {
+        long tblId = jobInfo.tblId;
+        for (BaseAnalysisTask task : queryFinished) {
+            String colName = task.col.getName();
+            if 
(!Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tblId, -1, 
colName)) {
+                analysisManager.removeColStatsStatus(tblId, colName);
+            }
+        }
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index b477a23680e..83441f4b2dd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -42,7 +42,6 @@ import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.ThreadPoolManager.BlockedPolicy;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
-import org.apache.doris.common.util.Daemon;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -101,7 +100,7 @@ import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
-public class AnalysisManager extends Daemon implements Writable {
+public class AnalysisManager implements Writable {
 
     private static final Logger LOG = 
LogManager.getLogger(AnalysisManager.class);
 
@@ -113,11 +112,11 @@ public class AnalysisManager extends Daemon implements 
Writable {
     private AnalysisTaskExecutor taskExecutor;
 
     // Store task information in metadata.
-    private final NavigableMap<Long, AnalysisInfo> analysisTaskInfoMap =
+    protected final NavigableMap<Long, AnalysisInfo> analysisTaskInfoMap =
             Collections.synchronizedNavigableMap(new TreeMap<>());
 
     // Store job information in metadata.
-    private final NavigableMap<Long, AnalysisInfo> analysisJobInfoMap =
+    protected final NavigableMap<Long, AnalysisInfo> analysisJobInfoMap =
             Collections.synchronizedNavigableMap(new TreeMap<>());
 
     // Tracking system submitted job, keep in mem only
@@ -128,6 +127,8 @@ public class AnalysisManager extends Daemon implements 
Writable {
 
     private final Map<Long, TableStatsMeta> idToTblStats = new 
ConcurrentHashMap<>();
 
+    private final Map<Long, AnalysisJob> idToAnalysisJob = new 
ConcurrentHashMap<>();
+
     protected SimpleQueue<AnalysisInfo> autoJobs = createSimpleQueue(null, 
this);
 
     private final Function<TaskStatusWrapper, Void> userJobStatusUpdater = w 
-> {
@@ -237,7 +238,6 @@ public class AnalysisManager extends Daemon implements 
Writable {
             new Function[] {userJobStatusUpdater, systemJobStatusUpdater};
 
     public AnalysisManager() {
-        
super(TimeUnit.SECONDS.toMillis(StatisticConstants.ANALYZE_MANAGER_INTERVAL_IN_SECS));
         if (!Env.isCheckpointThread()) {
             this.taskExecutor = new 
AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num);
             this.statisticsCache = new StatisticsCache();
@@ -245,44 +245,6 @@ public class AnalysisManager extends Daemon implements 
Writable {
         }
     }
 
-    @Override
-    protected void runOneCycle() {
-        clear();
-    }
-
-    private void clear() {
-        clearExpiredAnalysisInfo(analysisJobInfoMap, (a) ->
-                        a.scheduleType.equals(ScheduleType.ONCE)
-                                && System.currentTimeMillis() - 
a.lastExecTimeInMs
-                                > 
TimeUnit.DAYS.toMillis(StatisticConstants.ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS),
-                (id) -> {
-                    Env.getCurrentEnv().getEditLog().logDeleteAnalysisJob(new 
AnalyzeDeletionLog(id));
-                    return null;
-                });
-        clearExpiredAnalysisInfo(analysisTaskInfoMap, (a) -> 
System.currentTimeMillis() - a.lastExecTimeInMs
-                        > 
TimeUnit.DAYS.toMillis(StatisticConstants.ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS),
-                (id) -> {
-                    Env.getCurrentEnv().getEditLog().logDeleteAnalysisTask(new 
AnalyzeDeletionLog(id));
-                    return null;
-                });
-    }
-
-    private void clearExpiredAnalysisInfo(Map<Long, AnalysisInfo> infoMap, 
Predicate<AnalysisInfo> isExpired,
-            Function<Long, Void> writeLog) {
-        synchronized (infoMap) {
-            List<Long> expired = new ArrayList<>();
-            for (Entry<Long, AnalysisInfo> entry : infoMap.entrySet()) {
-                if (isExpired.test(entry.getValue())) {
-                    expired.add(entry.getKey());
-                }
-            }
-            for (Long k : expired) {
-                infoMap.remove(k);
-                writeLog.apply(k);
-            }
-        }
-    }
-
     public StatisticsCache getStatisticsCache() {
         return statisticsCache;
     }
@@ -371,6 +333,7 @@ public class AnalysisManager extends Daemon implements 
Writable {
         boolean isSync = stmt.isSync();
         Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
         createTaskForEachColumns(jobInfo, analysisTaskInfos, isSync);
+        constructJob(jobInfo, analysisTaskInfos.values());
         if (!jobInfo.partitionOnly && stmt.isAllColumns()
                 && StatisticsUtil.isExternalTable(jobInfo.catalogId, 
jobInfo.dbId, jobInfo.tblId)) {
             createTableLevelTaskForExternalTable(jobInfo, analysisTaskInfos, 
isSync);
@@ -446,7 +409,6 @@ public class AnalysisManager extends Daemon implements 
Writable {
      */
     private Map<String, Set<String>> validateAndGetPartitions(TableIf table, 
Set<String> columnNames,
             Set<String> partitionNames, AnalysisType analysisType) throws 
DdlException {
-        long tableId = table.getId();
 
         Map<String, Set<String>> columnToPartitions = columnNames.stream()
                 .collect(Collectors.toMap(
@@ -467,27 +429,6 @@ public class AnalysisManager extends Daemon implements 
Writable {
             return columnToPartitions;
         }
 
-        // Get the partition granularity statistics that have been collected
-        Map<String, Set<String>> existColAndPartsForStats = 
StatisticsRepository
-                .fetchColAndPartsForStats(tableId);
-
-        if (existColAndPartsForStats.isEmpty()) {
-            // There is no historical statistical information, no need to do 
validation
-            return columnToPartitions;
-        }
-
-        Set<String> existPartIdsForStats = new HashSet<>();
-        
existColAndPartsForStats.values().forEach(existPartIdsForStats::addAll);
-        Set<String> idToPartition = StatisticsUtil.getPartitionIds(table);
-        // Get an invalid set of partitions (those partitions were deleted)
-        Set<String> invalidPartIds = existPartIdsForStats.stream()
-                .filter(id -> 
!idToPartition.contains(id)).collect(Collectors.toSet());
-
-        if (!invalidPartIds.isEmpty()) {
-            // Delete invalid partition statistics to avoid affecting table 
statistics
-            StatisticsRepository.dropStatistics(invalidPartIds);
-        }
-
         if (analysisType == AnalysisType.FUNDAMENTALS) {
             Map<String, Set<String>> result = 
table.findReAnalyzeNeededPartitions();
             result.keySet().retainAll(columnNames);
@@ -720,11 +661,12 @@ public class AnalysisManager extends Daemon implements 
Writable {
     public void syncExecute(Collection<BaseAnalysisTask> tasks) {
         SyncTaskCollection syncTaskCollection = new SyncTaskCollection(tasks);
         ConnectContext ctx = ConnectContext.get();
+        ThreadPoolExecutor syncExecPool = createThreadPoolForSyncAnalyze();
         try {
             ctxToSyncTask.put(ctx, syncTaskCollection);
-            ThreadPoolExecutor syncExecPool = createThreadPoolForSyncAnalyze();
             syncTaskCollection.execute(syncExecPool);
         } finally {
+            syncExecPool.shutdown();
             ctxToSyncTask.remove(ctx);
         }
     }
@@ -737,7 +679,7 @@ public class AnalysisManager extends Daemon implements 
Writable {
                 new SynchronousQueue(),
                 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SYNC 
ANALYZE" + "-%d")
                         .build(), new BlockedPolicy(poolName,
-                (int) 
TimeUnit.HOURS.toSeconds(Config.analyze_task_timeout_in_hours)));
+                StatisticsUtil.getAnalyzeTimeout()));
     }
 
     public void dropStats(DropStatsStmt dropStatsStmt) throws DdlException {
@@ -759,6 +701,7 @@ public class AnalysisManager extends Daemon implements 
Writable {
             for (String col : cols) {
                 Env.getCurrentEnv().getStatisticsCache().invalidate(tblId, 
-1L, col);
             }
+            tableStats.updatedTime = 0;
         }
         logCreateTableStats(tableStats);
         StatisticsRepository.dropStatistics(tblId, cols);
@@ -1128,4 +1071,17 @@ public class AnalysisManager extends Daemon implements 
Writable {
         }
         return tableStats.findColumnStatsMeta(colName);
     }
+
+    public AnalysisJob findJob(long id) {
+        return idToAnalysisJob.get(id);
+    }
+
+    public void constructJob(AnalysisInfo jobInfo, Collection<? extends 
BaseAnalysisTask> tasks) {
+        AnalysisJob job = new AnalysisJob(jobInfo, tasks);
+        idToAnalysisJob.put(jobInfo.jobId, job);
+    }
+
+    public void removeJob(long id) {
+        idToAnalysisJob.remove(id);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
index 4b133ce0ebf..58bae9fe66b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
@@ -18,9 +18,9 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.catalog.Env;
-import org.apache.doris.common.Config;
 import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.ThreadPoolManager.BlockedPolicy;
+import org.apache.doris.statistics.util.StatisticsUtil;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -36,7 +36,7 @@ public class AnalysisTaskExecutor extends Thread {
 
     private static final Logger LOG = 
LogManager.getLogger(AnalysisTaskExecutor.class);
 
-    private final ThreadPoolExecutor executors;
+    protected final ThreadPoolExecutor executors;
 
     private final BlockingQueue<AnalysisTaskWrapper> taskQueue =
             new PriorityBlockingQueue<AnalysisTaskWrapper>(20,
@@ -72,18 +72,22 @@ public class AnalysisTaskExecutor extends Thread {
 
     private void doCancelExpiredJob() {
         for (;;) {
+            tryToCancel();
+        }
+    }
+
+    protected void tryToCancel() {
+        try {
+            AnalysisTaskWrapper taskWrapper = taskQueue.take();
             try {
-                AnalysisTaskWrapper taskWrapper = taskQueue.take();
-                try {
-                    long timeout = 
TimeUnit.HOURS.toMillis(Config.analyze_task_timeout_in_hours)
-                            - (System.currentTimeMillis() - 
taskWrapper.getStartTime());
-                    taskWrapper.get(timeout < 0 ? 0 : timeout, 
TimeUnit.MILLISECONDS);
-                } catch (Exception e) {
-                    taskWrapper.cancel(e.getMessage());
-                }
-            } catch (Throwable throwable) {
-                LOG.warn(throwable);
+                long timeout = 
TimeUnit.SECONDS.toMillis(StatisticsUtil.getAnalyzeTimeout())
+                        - (System.currentTimeMillis() - 
taskWrapper.getStartTime());
+                taskWrapper.get(timeout < 0 ? 0 : timeout, 
TimeUnit.MILLISECONDS);
+            } catch (Exception e) {
+                taskWrapper.cancel(e.getMessage());
             }
+        } catch (Throwable throwable) {
+            LOG.warn("cancel analysis task failed", throwable);
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java
index 9aa3d85992b..ffdd375ee9e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.statistics;
 
-import org.apache.doris.catalog.Env;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
@@ -59,9 +58,8 @@ public class AnalysisTaskWrapper extends FutureTask<Void> {
             if (task.info.scheduleType.equals(ScheduleType.AUTOMATIC) && 
!StatisticsUtil.inAnalyzeTime(
                     LocalTime.now(TimeUtils.getTimeZone().toZoneId()))) {
                 // TODO: Do we need a separate AnalysisState here?
-                Env.getCurrentEnv().getAnalysisManager()
-                        .updateTaskStatus(task.info, AnalysisState.FAILED, 
"Auto task"
-                                + "doesn't get executed within specified time 
range", System.currentTimeMillis());
+                task.job.taskFailed(task, "Auto task"
+                                + "doesn't get executed within specified time 
range");
                 return;
             }
             executor.putJob(this);
@@ -76,15 +74,7 @@ public class AnalysisTaskWrapper extends FutureTask<Void> {
             if (!task.killed) {
                 if (except != null) {
                     LOG.warn("Analyze {} failed.", task.toString(), except);
-                    Env.getCurrentEnv().getAnalysisManager()
-                            .updateTaskStatus(task.info,
-                                    AnalysisState.FAILED, 
Util.getRootCauseMessage(except), System.currentTimeMillis());
-                } else {
-                    LOG.debug("Analyze {} finished, cost time:{}", 
task.toString(),
-                            System.currentTimeMillis() - startTime);
-                    Env.getCurrentEnv().getAnalysisManager()
-                            .updateTaskStatus(task.info,
-                                    AnalysisState.FINISHED, "", 
System.currentTimeMillis());
+                    task.job.taskFailed(task, 
Util.getRootCauseMessage(except));
                 }
             }
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
index 4f7d588de73..3fcebd6c38b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
@@ -22,14 +22,12 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.Config;
 import org.apache.doris.datasource.CatalogIf;
-import org.apache.doris.qe.AuditLogHelper;
-import org.apache.doris.qe.QueryState;
-import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.qe.AutoCloseConnectContext;
 import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
 import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
+import org.apache.doris.statistics.AnalysisInfo.JobType;
 import org.apache.doris.statistics.util.DBObjects;
 import org.apache.doris.statistics.util.StatisticsUtil;
 
@@ -38,6 +36,7 @@ import com.google.common.base.Preconditions;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
 public abstract class BaseAnalysisTask {
@@ -52,59 +51,25 @@ public abstract class BaseAnalysisTask {
             + "else NDV(`${colName}`) * ${scaleFactor} end AS ndv, "
             ;
 
-    /**
-     * Stats stored in the column_statistics table basically has two types, 
`part_id` is null which means it is
-     * aggregate from partition level stats, `part_id` is not null which means 
it is partition level stats.
-     * For latter, it's id field contains part id, for previous doesn't.
-     */
-    protected static final String INSERT_PART_STATISTICS = "INSERT INTO "
-            + "${internalDB}.${columnStatTbl}"
-            + " SELECT "
-            + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}', '-', 
${partId}) AS id, "
-            + "${catalogId} AS catalog_id, "
-            + "${dbId} AS db_id, "
-            + "${tblId} AS tbl_id, "
-            + "${idxId} AS idx_id, "
-            + "'${colId}' AS col_id, "
-            + "${partId} AS part_id, "
-            + "COUNT(1) AS row_count, "
-            + "NDV(`${colName}`) AS ndv, "
-            + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS 
null_count, "
-            + "MIN(`${colName}`) AS min, "
-            + "MAX(`${colName}`) AS max, "
-            + "${dataSizeFunction} AS data_size, "
-            + "NOW() ";
-
-    protected static final String INSERT_COL_STATISTICS = "INSERT INTO "
-            + "${internalDB}.${columnStatTbl}"
-            + "    SELECT id, catalog_id, db_id, tbl_id, idx_id, col_id, 
part_id, row_count, "
-            + "        ndv, null_count,"
-            + " to_base64(CAST(min AS string)), to_base64(CAST(max AS 
string)), data_size, update_time\n"
-            + "    FROM \n"
-            + "     (SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') 
AS id, "
+    protected static final String COLLECT_COL_STATISTICS =
+            "SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
             + "         ${catalogId} AS catalog_id, "
             + "         ${dbId} AS db_id, "
             + "         ${tblId} AS tbl_id, "
             + "         ${idxId} AS idx_id, "
             + "         '${colId}' AS col_id, "
             + "         NULL AS part_id, "
-            + "         SUM(count) AS row_count, \n"
-            + "         SUM(null_count) AS null_count, "
-            + "         MIN(CAST(from_base64(min) AS ${type})) AS min, "
-            + "         MAX(CAST(from_base64(max) AS ${type})) AS max, "
-            + "         SUM(data_size_in_bytes) AS data_size, "
-            + "         NOW() AS update_time \n"
-            + "     FROM ${internalDB}.${columnStatTbl}"
-            + "     WHERE ${internalDB}.${columnStatTbl}.db_id = '${dbId}' AND 
"
-            + "     ${internalDB}.${columnStatTbl}.tbl_id='${tblId}' AND "
-            + "     ${internalDB}.${columnStatTbl}.col_id='${colId}' AND "
-            + "     ${internalDB}.${columnStatTbl}.idx_id='${idxId}' AND "
-            + "     ${internalDB}.${columnStatTbl}.part_id IS NOT NULL"
-            + "     ) t1, \n";
-
-    protected static final String ANALYZE_PARTITION_COLUMN_TEMPLATE = "INSERT 
INTO "
-            + "${internalDB}.${columnStatTbl}"
-            + " SELECT "
+            + "         COUNT(1) AS row_count, "
+            + "         NDV(`${colName}`) AS ndv, "
+            + "         COUNT(1) - COUNT(${colName}) AS null_count, "
+            + "         CAST(MIN(${colName}) AS STRING) AS min, "
+            + "         CAST(MAX(${colName}) AS STRING) AS max, "
+            + "         ${dataSizeFunction} AS data_size, "
+            + "         NOW() AS update_time "
+            + " FROM `${dbName}`.`${tblName}`";
+
+    protected static final String ANALYZE_PARTITION_COLUMN_TEMPLATE =
+            " SELECT "
             + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
             + "${catalogId} AS catalog_id, "
             + "${dbId} AS db_id, "
@@ -115,8 +80,8 @@ public abstract class BaseAnalysisTask {
             + "${row_count} AS row_count, "
             + "${ndv} AS ndv, "
             + "${null_count} AS null_count, "
-            + "to_base64('${min}') AS min, "
-            + "to_base64('${max}') AS max, "
+            + "'${min}' AS min, "
+            + "'${max}' AS max, "
             + "${data_size} AS data_size, "
             + "NOW() ";
 
@@ -136,6 +101,8 @@ public abstract class BaseAnalysisTask {
 
     protected TableSample tableSample = null;
 
+    protected AnalysisJob job;
+
     @VisibleForTesting
     public BaseAnalysisTask() {
 
@@ -192,6 +159,7 @@ public abstract class BaseAnalysisTask {
                 }
                 LOG.warn("Failed to execute analysis task, retried times: {}", 
retriedTimes++, t);
                 if (retriedTimes > 
StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
+                    job.taskFailed(this, t.getMessage());
                     throw new RuntimeException(t);
                 }
                 StatisticsUtil.sleep(TimeUnit.SECONDS.toMillis(2 ^ 
retriedTimes) * 10);
@@ -266,11 +234,10 @@ public abstract class BaseAnalysisTask {
             return new TableSample(true, (long) info.samplePercent);
         } else if (info.sampleRows > 0) {
             return new TableSample(false, info.sampleRows);
-        } else if (info.analysisMethod == AnalysisMethod.FULL
-                && Config.enable_auto_sample
-                && tbl.getDataSize(true) > 
Config.huge_table_lower_bound_size_in_bytes) {
+        } else if (info.jobType.equals(JobType.SYSTEM) && info.analysisMethod 
== AnalysisMethod.FULL
+                && tbl.getDataSize(true) > 
StatisticsUtil.getHugeTableLowerBoundSizeInBytes()) {
             // If user doesn't specify sample percent/rows, use auto sample 
and update sample rows in analysis info.
-            return new TableSample(false, (long) 
Config.huge_table_default_sample_rows);
+            return new TableSample(false, 
StatisticsUtil.getHugeTableSampleRows());
         } else {
             return null;
         }
@@ -283,23 +250,20 @@ public abstract class BaseAnalysisTask {
                 col == null ? "TableRowCount" : col.getName());
     }
 
-    protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor) 
throws Exception {
-        if (killed) {
-            return;
-        }
-        LOG.debug("execute internal sql: {}", stmtExecutor.getOriginStmt());
-        try {
-            stmtExecutor.execute();
-            QueryState queryState = stmtExecutor.getContext().getState();
-            if (queryState.getStateType().equals(MysqlStateType.ERR)) {
-                throw new RuntimeException(String.format("Failed to analyze 
%s.%s.%s, error: %s sql: %s",
-                        catalog.getName(), db.getFullName(), info.colName, 
stmtExecutor.getOriginStmt().toString(),
-                        queryState.getErrorMessage()));
-            }
+    public void setJob(AnalysisJob job) {
+        this.job = job;
+    }
+
+    protected void runQuery(String sql) {
+        long startTime = System.currentTimeMillis();
+        try (AutoCloseConnectContext a  = 
StatisticsUtil.buildConnectContext()) {
+            stmtExecutor = new StmtExecutor(a.connectContext, sql);
+            stmtExecutor.executeInternalQuery();
+            ColStatsData colStatsData = new 
ColStatsData(stmtExecutor.executeInternalQuery().get(0));
+            job.appendBuf(this, Collections.singletonList(colStatsData));
         } finally {
-            AuditLogHelper.logAuditLog(stmtExecutor.getContext(), 
stmtExecutor.getOriginStmt().toString(),
-                    stmtExecutor.getParsedStmt(), 
stmtExecutor.getQueryStatisticsForAuditLog(),
-                    true);
+            LOG.debug("End cost time in secs: " + (System.currentTimeMillis() 
- startTime) / 1000);
         }
     }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java
index 6c94326a942..41936232afd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java
@@ -19,6 +19,8 @@ package org.apache.doris.statistics;
 
 import org.apache.doris.statistics.util.StatisticsUtil;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.nio.charset.StandardCharsets;
 import java.util.Base64;
 import java.util.StringJoiner;
@@ -54,6 +56,18 @@ public class ColStatsData {
 
     public final String updateTime;
 
+    @VisibleForTesting
+    public ColStatsData() {
+        statsId = new StatsId();
+        count = 0;
+        ndv = 0;
+        nullCount = 0;
+        minLit = null;
+        maxLit = null;
+        dataSizeInBytes = 0;
+        updateTime = null;
+    }
+
     public ColStatsData(ResultRow row) {
         this.statsId = new StatsId(row);
         this.count = (long) Double.parseDouble(row.get(7));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java
index 4583237f8c6..049e80d52fd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java
@@ -23,26 +23,19 @@ import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.datasource.hive.HiveMetaStoreCache;
 import org.apache.doris.external.hive.util.HiveUtil;
-import org.apache.doris.qe.AutoCloseConnectContext;
-import org.apache.doris.qe.QueryState;
-import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.statistics.util.StatisticsUtil;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.commons.text.StringSubstitutor;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
-import java.util.StringJoiner;
 import java.util.stream.Collectors;
 
 public class HMSAnalysisTask extends BaseAnalysisTask {
@@ -51,9 +44,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
     // While doing sample analysis, the sampled ndv result will multiply a 
factor (total size/sample size)
     // if ndv(col)/count(col) is greater than this threshold.
 
-    private static final String ANALYZE_TABLE_TEMPLATE = "INSERT INTO "
-            + "${internalDB}.${columnStatTbl}"
-            + " SELECT "
+    private static final String ANALYZE_TABLE_TEMPLATE = " SELECT "
             + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
             + "${catalogId} AS catalog_id, "
             + "${dbId} AS db_id, "
@@ -70,28 +61,9 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
             + "NOW() "
             + "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleExpr}";
 
-    private static final String ANALYZE_PARTITION_TEMPLATE = " SELECT "
-            + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}', '-', 
${partId}) AS id, "
-            + "${catalogId} AS catalog_id, "
-            + "${dbId} AS db_id, "
-            + "${tblId} AS tbl_id, "
-            + "${idxId} AS idx_id, "
-            + "'${colId}' AS col_id, "
-            + "${partId} AS part_id, "
-            + "COUNT(1) AS row_count, "
-            + "NDV(`${colName}`) AS ndv, "
-            + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS 
null_count, "
-            + "to_base64(MIN(`${colName}`)) AS min, "
-            + "to_base64(MAX(`${colName}`)) AS max, "
-            + "${dataSizeFunction} AS data_size, "
-            + "NOW() FROM `${catalogName}`.`${dbName}`.`${tblName}` where ";
-
     private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT 
ROUND(COUNT(1) * ${scaleFactor}) as rowCount "
             + "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleExpr}";
 
-    // cache stats for each partition, it would be inserted into 
column_statistics in a batch.
-    private final List<List<ColStatsData>> buf = new ArrayList<>();
-
     private final boolean isTableLevelTask;
     private final boolean isPartitionOnly;
     private Set<String> partitionNames;
@@ -131,25 +103,16 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
      * Get column statistics and insert the result to 
__internal_schema.column_statistics
      */
     private void getTableColumnStats() throws Exception {
-        if (isPartitionOnly) {
-            getPartitionNames();
-            List<String> partitionAnalysisSQLs = new ArrayList<>();
-            for (String partId : this.partitionNames) {
-                partitionAnalysisSQLs.add(generateSqlForPartition(partId));
-            }
-            execSQLs(partitionAnalysisSQLs);
-        } else {
-            if (!info.usingSqlForPartitionColumn && isPartitionColumn()) {
-                try {
-                    getPartitionColumnStats();
-                } catch (Exception e) {
-                    LOG.warn("Failed to collect stats for partition col {} 
using metadata, "
-                            + "fallback to normal collection", col.getName(), 
e);
-                    getOrdinaryColumnStats();
-                }
-            } else {
+        if (!info.usingSqlForPartitionColumn && isPartitionColumn()) {
+            try {
+                getPartitionColumnStats();
+            } catch (Exception e) {
+                LOG.warn("Failed to collect stats for partition col {} using 
metadata, "
+                        + "fallback to normal collection", col.getName(), e);
                 getOrdinaryColumnStats();
             }
+        } else {
+            getOrdinaryColumnStats();
         }
     }
 
@@ -182,7 +145,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
         params.put("maxFunction", getMaxFunction());
         StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
         String sql = stringSubstitutor.replace(sb.toString());
-        executeInsertSql(sql);
+        runQuery(sql);
     }
 
     private void getPartitionColumnStats() throws Exception {
@@ -227,7 +190,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
         params.put("data_size", String.valueOf(dataSize));
         StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
         String sql = 
stringSubstitutor.replace(ANALYZE_PARTITION_COLUMN_TEMPLATE);
-        executeInsertSql(sql);
+        runQuery(sql);
     }
 
     private String updateMinValue(String currentMin, String value) {
@@ -278,7 +241,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
                 partitionNames = table.getPartitionNames();
             } else if (info.partitionCount > 0) {
                 partitionNames = table.getPartitionNames().stream()
-                    .limit(info.partitionCount).collect(Collectors.toSet());
+                        
.limit(info.partitionCount).collect(Collectors.toSet());
             }
             if (partitionNames == null || partitionNames.isEmpty()) {
                 throw new RuntimeException("Not a partition table or no 
partition specified.");
@@ -286,80 +249,6 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
         }
     }
 
-    private String generateSqlForPartition(String partId) {
-        StringBuilder sb = new StringBuilder();
-        sb.append(ANALYZE_PARTITION_TEMPLATE);
-        String[] splits = partId.split("/");
-        for (int i = 0; i < splits.length; i++) {
-            String[] kv = splits[i].split("=");
-            sb.append(kv[0]);
-            sb.append("='");
-            sb.append(kv[1]);
-            sb.append("'");
-            if (i < splits.length - 1) {
-                sb.append(" and ");
-            }
-        }
-        Map<String, String> params = buildStatsParams(partId);
-        params.put("dataSizeFunction", getDataSizeFunction(col));
-        return new StringSubstitutor(params).replace(sb.toString());
-    }
-
-    public void execSQLs(List<String> partitionAnalysisSQLs) throws Exception {
-        long startTime = System.currentTimeMillis();
-        LOG.debug("analyze task {} start at {}", info.toString(), new Date());
-        try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) 
{
-            List<List<String>> sqlGroups = 
Lists.partition(partitionAnalysisSQLs, StatisticConstants.UNION_ALL_LIMIT);
-            for (List<String> group : sqlGroups) {
-                if (killed) {
-                    return;
-                }
-                StringJoiner partitionCollectSQL = new StringJoiner("UNION 
ALL");
-                group.forEach(partitionCollectSQL::add);
-                stmtExecutor = new StmtExecutor(r.connectContext, 
partitionCollectSQL.toString());
-                buf.add(stmtExecutor.executeInternalQuery()
-                        
.stream().map(ColStatsData::new).collect(Collectors.toList()));
-                QueryState queryState = r.connectContext.getState();
-                if 
(queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) {
-                    throw new RuntimeException(String.format("Failed to 
analyze %s.%s.%s, error: %s sql: %s",
-                        catalog.getName(), db.getFullName(), info.colName, 
partitionCollectSQL,
-                        queryState.getErrorMessage()));
-                }
-            }
-            for (List<ColStatsData> colStatsDataList : buf) {
-                StringBuilder batchInsertSQL =
-                        new StringBuilder("INSERT INTO " + 
StatisticConstants.FULL_QUALIFIED_STATS_TBL_NAME
-                        + " VALUES ");
-                StringJoiner sj = new StringJoiner(",");
-                colStatsDataList.forEach(c -> sj.add(c.toSQL(true)));
-                batchInsertSQL.append(sj);
-                stmtExecutor = new StmtExecutor(r.connectContext, 
batchInsertSQL.toString());
-                executeWithExceptionOnFail(stmtExecutor);
-            }
-        } finally {
-            LOG.debug("analyze task {} end. cost {}ms", info, 
System.currentTimeMillis() - startTime);
-        }
-
-    }
-
-    private void executeInsertSql(String sql) throws Exception {
-        long startTime = System.currentTimeMillis();
-        try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) 
{
-            r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
-            this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
-            r.connectContext.setExecutor(stmtExecutor);
-            this.stmtExecutor.execute();
-            QueryState queryState = r.connectContext.getState();
-            if 
(queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) {
-                LOG.warn(String.format("Failed to analyze %s.%s.%s, sql: [%s], 
error: [%s]",
-                        catalog.getName(), db.getFullName(), info.colName, 
sql, queryState.getErrorMessage()));
-                throw new RuntimeException(queryState.getErrorMessage());
-            }
-            LOG.debug(String.format("Analyze %s.%s.%s done. SQL: [%s]. Cost %d 
ms.",
-                    catalog.getName(), db.getFullName(), info.colName, sql, 
(System.currentTimeMillis() - startTime)));
-        }
-    }
-
     private Map<String, String> buildStatsParams(String partId) {
         Map<String, String> commonParams = new HashMap<>();
         String id = StatisticsUtil.constructId(tbl.getId(), -1);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java
index 5ae66d292dc..649b075c673 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java
@@ -20,25 +20,17 @@ package org.apache.doris.statistics;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.external.JdbcExternalTable;
 import org.apache.doris.common.FeConstants;
-import org.apache.doris.qe.AutoCloseConnectContext;
-import org.apache.doris.qe.QueryState;
-import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.statistics.util.StatisticsUtil;
 
 import org.apache.commons.text.StringSubstitutor;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 public class JdbcAnalysisTask extends BaseAnalysisTask {
-    private static final Logger LOG = 
LogManager.getLogger(JdbcAnalysisTask.class);
 
-    private static final String ANALYZE_SQL_TABLE_TEMPLATE = "INSERT INTO "
-            + "${internalDB}.${columnStatTbl}"
-            + " SELECT "
+    private static final String ANALYZE_SQL_TABLE_TEMPLATE = " SELECT "
             + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
             + "${catalogId} AS catalog_id, "
             + "${dbId} AS db_id, "
@@ -49,8 +41,8 @@ public class JdbcAnalysisTask extends BaseAnalysisTask {
             + "COUNT(1) AS row_count, "
             + "NDV(`${colName}`) AS ndv, "
             + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS 
null_count, "
-            + "to_base64(MIN(`${colName}`)) AS min, "
-            + "to_base64(MAX(`${colName}`)) AS max, "
+            + "MIN(`${colName}`) AS min, "
+            + "MAX(`${colName}`) AS max, "
             + "${dataSizeFunction} AS data_size, "
             + "NOW() "
             + "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
@@ -117,25 +109,7 @@ public class JdbcAnalysisTask extends BaseAnalysisTask {
         params.put("dataSizeFunction", getDataSizeFunction(col));
         StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
         String sql = stringSubstitutor.replace(sb.toString());
-        executeInsertSql(sql);
-    }
-
-    private void executeInsertSql(String sql) throws Exception {
-        long startTime = System.currentTimeMillis();
-        try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) 
{
-            r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
-            this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
-            r.connectContext.setExecutor(stmtExecutor);
-            this.stmtExecutor.execute();
-            QueryState queryState = r.connectContext.getState();
-            if 
(queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) {
-                LOG.warn(String.format("Failed to analyze %s.%s.%s, sql: [%s], 
error: [%s]",
-                        catalog.getName(), db.getFullName(), info.colName, 
sql, queryState.getErrorMessage()));
-                throw new RuntimeException(queryState.getErrorMessage());
-            }
-            LOG.debug(String.format("Analyze %s.%s.%s done. SQL: [%s]. Cost %d 
ms.",
-                    catalog.getName(), db.getFullName(), info.colName, sql, 
(System.currentTimeMillis() - startTime)));
-        }
+        runQuery(sql);
     }
 
     private Map<String, String> buildTableStatsParams(String partId) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java
deleted file mode 100644
index 6a43c5092fa..00000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java
+++ /dev/null
@@ -1,152 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.statistics;
-
-import org.apache.doris.analysis.CreateMaterializedViewStmt;
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.analysis.FunctionCallExpr;
-import org.apache.doris.analysis.PartitionNames;
-import org.apache.doris.analysis.SelectListItem;
-import org.apache.doris.analysis.SelectStmt;
-import org.apache.doris.analysis.SlotRef;
-import org.apache.doris.analysis.SqlParser;
-import org.apache.doris.analysis.SqlScanner;
-import org.apache.doris.analysis.TableRef;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.MaterializedIndexMeta;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.util.SqlParserUtils;
-import org.apache.doris.statistics.util.StatisticsUtil;
-
-import com.google.common.base.Preconditions;
-
-import java.io.StringReader;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Analysis for the materialized view, only gets constructed when the 
AnalyzeStmt is not set which
- * columns to be analyzed.
- * TODO: Supports multi-table mv
- */
-public class MVAnalysisTask extends BaseAnalysisTask {
-
-    private static final String ANALYZE_MV_PART = INSERT_PART_STATISTICS
-            + " FROM (${sql}) mv ${sampleExpr}";
-
-    private static final String ANALYZE_MV_COL = INSERT_COL_STATISTICS
-            + "     (SELECT NDV(`${colName}`) AS ndv "
-            + "     FROM (${sql}) mv) t2";
-
-    private MaterializedIndexMeta meta;
-
-    private SelectStmt selectStmt;
-
-    private OlapTable olapTable;
-
-    public MVAnalysisTask(AnalysisInfo info) {
-        super(info);
-        init();
-    }
-
-    private void init() {
-        olapTable = (OlapTable) tbl;
-        meta = olapTable.getIndexMetaByIndexId(info.indexId);
-        Preconditions.checkState(meta != null);
-        String mvDef = meta.getDefineStmt().originStmt;
-        SqlScanner input =
-                new SqlScanner(new StringReader(mvDef), 0L);
-        SqlParser parser = new SqlParser(input);
-        CreateMaterializedViewStmt cmv = null;
-        try {
-            cmv = (CreateMaterializedViewStmt) SqlParserUtils.getStmt(parser, 
0);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        selectStmt = cmv.getSelectStmt();
-        selectStmt.getTableRefs().get(0).getName().setDb(db.getFullName());
-    }
-
-    @Override
-    public void doExecute() throws Exception {
-        for (Column column : meta.getSchema()) {
-            SelectStmt selectOne = (SelectStmt) selectStmt.clone();
-            TableRef tableRef = selectOne.getTableRefs().get(0);
-            SelectListItem selectItem = selectOne.getSelectList().getItems()
-                    .stream()
-                    .filter(i -> isCorrespondingToColumn(i, column))
-                    .findFirst()
-                    .get();
-            selectItem.setAlias(column.getName());
-            Map<String, String> params = new HashMap<>();
-            for (String partName : tbl.getPartitionNames()) {
-                PartitionNames partitionName = new PartitionNames(false,
-                        Collections.singletonList(partName));
-                tableRef.setPartitionNames(partitionName);
-                String sql = selectOne.toSql();
-                params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
-                params.put("columnStatTbl", 
StatisticConstants.STATISTIC_TBL_NAME);
-                params.put("catalogId", String.valueOf(catalog.getId()));
-                params.put("dbId", String.valueOf(db.getId()));
-                params.put("tblId", String.valueOf(tbl.getId()));
-                params.put("idxId", String.valueOf(meta.getIndexId()));
-                String colName = column.getName();
-                params.put("colId", colName);
-                String partId = olapTable.getPartition(partName) == null ? 
"NULL" :
-                        
String.valueOf(olapTable.getPartition(partName).getId());
-                params.put("partId", partId);
-                params.put("dataSizeFunction", getDataSizeFunction(column));
-                params.put("dbName", db.getFullName());
-                params.put("colName", colName);
-                params.put("tblName", tbl.getName());
-                params.put("sql", sql);
-                StatisticsUtil.execUpdate(ANALYZE_MV_PART, params);
-            }
-            params.remove("partId");
-            params.remove("sampleExpr");
-            params.put("type", column.getType().toString());
-            StatisticsUtil.execUpdate(ANALYZE_MV_COL, params);
-            Env.getCurrentEnv().getStatisticsCache()
-                    .refreshColStatsSync(meta.getIndexId(), meta.getIndexId(), 
column.getName());
-        }
-    }
-
-    //  Based on the fact that materialized view create statement's select 
expr only contains basic SlotRef and
-    //  AggregateFunction.
-    private boolean isCorrespondingToColumn(SelectListItem item, Column 
column) {
-        Expr expr = item.getExpr();
-        if (expr instanceof SlotRef) {
-            SlotRef slotRef = (SlotRef) expr;
-            return slotRef.getColumnName().equalsIgnoreCase(column.getName());
-        }
-        if (expr instanceof FunctionCallExpr) {
-            FunctionCallExpr func = (FunctionCallExpr) expr;
-            SlotRef slotRef = (SlotRef) func.getChild(0);
-            return slotRef.getColumnName().equalsIgnoreCase(column.getName());
-        }
-        return false;
-    }
-
-    @Override
-    protected void afterExecution() {
-        // DO NOTHING
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
index 185a582cde4..b0c4b0b6c0e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
@@ -22,28 +22,21 @@ import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
-import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.qe.AutoCloseConnectContext;
-import org.apache.doris.qe.QueryState;
-import org.apache.doris.qe.QueryState.MysqlStateType;
 import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.statistics.AnalysisInfo.JobType;
 import org.apache.doris.statistics.util.StatisticsUtil;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
 import org.apache.commons.text.StringSubstitutor;
 
 import java.security.SecureRandom;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.StringJoiner;
 import java.util.stream.Collectors;
 
 /**
@@ -51,29 +44,6 @@ import java.util.stream.Collectors;
  */
 public class OlapAnalysisTask extends BaseAnalysisTask {
 
-    // TODO Currently, NDV is computed for the full table; in fact,
-    //  NDV should only be computed for the relevant partition.
-    private static final String ANALYZE_COLUMN_SQL_TEMPLATE = 
INSERT_COL_STATISTICS
-            + "     (SELECT NDV(`${colName}`) AS ndv "
-            + "     FROM `${dbName}`.`${tblName}`) t2";
-
-    private static final String COLLECT_PARTITION_STATS_SQL_TEMPLATE =
-            " SELECT "
-                    + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}', '-', 
${partId}) AS id, "
-                    + "${catalogId} AS catalog_id, "
-                    + "${dbId} AS db_id, "
-                    + "${tblId} AS tbl_id, "
-                    + "${idxId} AS idx_id, "
-                    + "'${colId}' AS col_id, "
-                    + "${partId} AS part_id, "
-                    + "COUNT(1) AS row_count, "
-                    + "NDV(`${colName}`) AS ndv, "
-                    + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) 
AS null_count, "
-                    + "MIN(`${colName}`) AS min, "
-                    + "MAX(`${colName}`) AS max, "
-                    + "${dataSizeFunction} AS data_size, "
-                    + "NOW() FROM `${dbName}`.`${tblName}` PARTITION 
${partitionName}";
-
     private static final String SAMPLE_COLUMN_SQL_TEMPLATE = "SELECT "
             + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
             + "${catalogId} AS catalog_id, "
@@ -92,9 +62,6 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
             + "FROM `${dbName}`.`${tblName}`"
             + "${tablets}";
 
-    // cache stats for each partition, it would be inserted into 
column_statistics in a batch.
-    private final List<List<ColStatsData>> buf = new ArrayList<>();
-
     @VisibleForTesting
     public OlapAnalysisTask() {
     }
@@ -148,45 +115,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
             stmtExecutor = new StmtExecutor(r.connectContext, 
stringSubstitutor.replace(SAMPLE_COLUMN_SQL_TEMPLATE));
             // Scalar query only return one row
             ColStatsData colStatsData = new 
ColStatsData(stmtExecutor.executeInternalQuery().get(0));
-            OlapTable olapTable = (OlapTable) tbl;
-            Collection<Partition> partitions = olapTable.getPartitions();
-            int partitionCount = partitions.size();
-            List<String> values = partitions.stream().map(p -> String.format(
-                    "(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 
NOW())",
-                    
StatisticsUtil.quote(StatisticsUtil.constructId(tbl.getId(), -1, col.getName(), 
p.getId())),
-                    InternalCatalog.INTERNAL_CATALOG_ID,
-                    db.getId(),
-                    tbl.getId(),
-                    -1,
-                    StatisticsUtil.quote(col.getName()),
-                    p.getId(),
-                    colStatsData.count / partitionCount,
-                    colStatsData.ndv / partitionCount,
-                    colStatsData.nullCount / partitionCount,
-                    StatisticsUtil.quote(colStatsData.minLit),
-                    StatisticsUtil.quote(colStatsData.maxLit),
-                    colStatsData.dataSizeInBytes / 
partitionCount)).collect(Collectors.toList());
-            values.add(String.format(
-                    "(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 
NOW())",
-                    
StatisticsUtil.quote(StatisticsUtil.constructId(tbl.getId(), -1, 
col.getName())),
-                    InternalCatalog.INTERNAL_CATALOG_ID,
-                    db.getId(),
-                    tbl.getId(),
-                    -1,
-                    StatisticsUtil.quote(col.getName()),
-                    "NULL",
-                    colStatsData.count,
-                    colStatsData.ndv,
-                    colStatsData.nullCount,
-                    StatisticsUtil.quote(colStatsData.minLit),
-                    StatisticsUtil.quote(colStatsData.maxLit),
-                    colStatsData.dataSizeInBytes));
-            String insertSQL = "INSERT INTO "
-                    + StatisticConstants.FULL_QUALIFIED_STATS_TBL_NAME
-                    + " VALUES "
-                    + String.join(",", values);
-            stmtExecutor = new StmtExecutor(r.connectContext, insertSQL);
-            executeWithExceptionOnFail(stmtExecutor);
+            job.appendBuf(this, Collections.singletonList(colStatsData));
         }
     }
 
@@ -198,6 +127,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
     protected void doFull() throws Exception {
         Set<String> partitionNames = info.colToPartitions.get(info.colName);
         if (partitionNames.isEmpty()) {
+            job.appendBuf(this, Collections.emptyList());
             return;
         }
         Map<String, String> params = new HashMap<>();
@@ -212,68 +142,14 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
         params.put("dbName", db.getFullName());
         params.put("colName", String.valueOf(info.colName));
         params.put("tblName", String.valueOf(tbl.getName()));
-        List<String> partitionAnalysisSQLs = new ArrayList<>();
-        try {
-            tbl.readLock();
-
-            for (String partitionName : partitionNames) {
-                Partition part = tbl.getPartition(partitionName);
-                if (part == null) {
-                    continue;
-                }
-                params.put("partId", 
String.valueOf(tbl.getPartition(partitionName).getId()));
-                // Avoid error when get the default partition
-                params.put("partitionName", "`" + partitionName + "`");
-                StringSubstitutor stringSubstitutor = new 
StringSubstitutor(params);
-                
partitionAnalysisSQLs.add(stringSubstitutor.replace(COLLECT_PARTITION_STATS_SQL_TEMPLATE));
-            }
-        } finally {
-            tbl.readUnlock();
-        }
-        execSQLs(partitionAnalysisSQLs, params);
+        execSQL(params);
     }
 
     @VisibleForTesting
-    public void execSQLs(List<String> partitionAnalysisSQLs, Map<String, 
String> params) throws Exception {
-        long startTime = System.currentTimeMillis();
-        LOG.debug("analyze task {} start at {}", info.toString(), new Date());
-        try (AutoCloseConnectContext r = 
StatisticsUtil.buildConnectContext(info.jobType.equals(JobType.SYSTEM))) {
-            List<List<String>> sqlGroups = 
Lists.partition(partitionAnalysisSQLs, StatisticConstants.UNION_ALL_LIMIT);
-            for (List<String> group : sqlGroups) {
-                if (killed) {
-                    return;
-                }
-                StringJoiner partitionCollectSQL = new StringJoiner("UNION 
ALL");
-                group.forEach(partitionCollectSQL::add);
-                stmtExecutor = new StmtExecutor(r.connectContext, 
partitionCollectSQL.toString());
-                buf.add(stmtExecutor.executeInternalQuery()
-                        
.stream().map(ColStatsData::new).collect(Collectors.toList()));
-                QueryState queryState = r.connectContext.getState();
-                if (queryState.getStateType().equals(MysqlStateType.ERR)) {
-                    throw new RuntimeException(String.format("Failed to 
analyze %s.%s.%s, error: %s sql: %s",
-                            catalog.getName(), db.getFullName(), info.colName, 
partitionCollectSQL,
-                            queryState.getErrorMessage()));
-                }
-            }
-            for (List<ColStatsData> colStatsDataList : buf) {
-                StringBuilder batchInsertSQL =
-                        new StringBuilder("INSERT INTO " + 
StatisticConstants.FULL_QUALIFIED_STATS_TBL_NAME
-                                + " VALUES ");
-                StringJoiner sj = new StringJoiner(",");
-                colStatsDataList.forEach(c -> sj.add(c.toSQL(true)));
-                batchInsertSQL.append(sj.toString());
-                stmtExecutor = new StmtExecutor(r.connectContext, 
batchInsertSQL.toString());
-                executeWithExceptionOnFail(stmtExecutor);
-            }
-            params.put("type", col.getType().toString());
-            StringSubstitutor stringSubstitutor = new 
StringSubstitutor(params);
-            String sql = 
stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE);
-            stmtExecutor = new StmtExecutor(r.connectContext, sql);
-            executeWithExceptionOnFail(stmtExecutor);
-        } finally {
-            LOG.debug("analyze task {} end. cost {}ms", info,
-                    System.currentTimeMillis() - startTime);
-        }
+    public void execSQL(Map<String, String> params) throws Exception {
+        StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
+        String collectColStats = 
stringSubstitutor.replace(COLLECT_COL_STATISTICS);
+        runQuery(collectColStats);
     }
 
     // Get sample tablets id and scale up scaleFactor
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
index e6b8297d0c0..f008c8fe301 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
@@ -78,12 +78,20 @@ public class StatisticConstants {
 
     public static final int LOAD_RETRY_TIMES = 3;
 
-    // union more relation than 512 may cause StackOverFlowException in the 
future.
-    public static final int UNION_ALL_LIMIT = 512;
-
     public static final String FULL_AUTO_ANALYZE_START_TIME = "00:00:00";
     public static final String FULL_AUTO_ANALYZE_END_TIME = "23:59:59";
 
+    public static final int INSERT_MERGE_ITEM_COUNT = 200;
+
+    public static final long HUGE_TABLE_DEFAULT_SAMPLE_ROWS = 4194304;
+    public static final long HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES = 5L * 1024 
* 1024 * 1024;
+
+    public static final long HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS = 
TimeUnit.HOURS.toMillis(12);
+
+    public static final int TABLE_STATS_HEALTH_THRESHOLD = 60;
+
+    public static final int ANALYZE_TIMEOUT_IN_SEC = 43200;
+
     static {
         SYSTEM_DBS.add(SystemInfoService.DEFAULT_CLUSTER
                 + ClusterNamespace.CLUSTER_DELIMITER + 
FeConstants.INTERNAL_DB_NAME);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index 325065d6e26..32cf5cfb24b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -113,7 +113,7 @@ public class StatisticsAutoCollector extends 
StatisticsCollector {
         if (!(table instanceof OlapTable || table instanceof ExternalTable)) {
             return true;
         }
-        if (table.getDataSize(true) < 
Config.huge_table_lower_bound_size_in_bytes) {
+        if (table.getDataSize(true) < 
StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5) {
             return false;
         }
         TableStatsMeta tableStats = 
Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());
@@ -121,12 +121,13 @@ public class StatisticsAutoCollector extends 
StatisticsCollector {
         if (tableStats == null) {
             return false;
         }
-        return System.currentTimeMillis() - tableStats.updatedTime < 
Config.huge_table_auto_analyze_interval_in_millis;
+        return System.currentTimeMillis()
+                - tableStats.updatedTime < 
StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis();
     }
 
     protected void createAnalyzeJobForTbl(DatabaseIf<? extends TableIf> db,
             List<AnalysisInfo> analysisInfos, TableIf table) {
-        AnalysisMethod analysisMethod = table.getDataSize(true) > 
Config.huge_table_lower_bound_size_in_bytes
+        AnalysisMethod analysisMethod = table.getDataSize(true) > 
StatisticsUtil.getHugeTableLowerBoundSizeInBytes()
                 ? AnalysisMethod.SAMPLE : AnalysisMethod.FULL;
         AnalysisInfo jobInfo = new AnalysisInfoBuilder()
                 .setJobId(Env.getCurrentEnv().getNextId())
@@ -141,7 +142,7 @@ public class StatisticsAutoCollector extends 
StatisticsCollector {
                 .setAnalysisType(AnalysisInfo.AnalysisType.FUNDAMENTALS)
                 .setAnalysisMode(AnalysisInfo.AnalysisMode.INCREMENTAL)
                 .setAnalysisMethod(analysisMethod)
-                .setSampleRows(Config.huge_table_default_sample_rows)
+                .setSampleRows(StatisticsUtil.getHugeTableSampleRows())
                 .setScheduleType(ScheduleType.AUTOMATIC)
                 .setState(AnalysisState.PENDING)
                 .setTaskIds(new ArrayList<>())
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
index c2f1db6bc4a..638db553987 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
@@ -73,14 +73,15 @@ public abstract class StatisticsCollector extends 
MasterDaemon {
             return;
         }
 
-        Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
+        Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
         AnalysisManager analysisManager = 
Env.getCurrentEnv().getAnalysisManager();
-        analysisManager.createTaskForEachColumns(jobInfo, analysisTaskInfos, 
false);
+        analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, 
false);
+        Env.getCurrentEnv().getAnalysisManager().constructJob(jobInfo, 
analysisTasks.values());
         if (StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId, 
jobInfo.tblId)) {
-            analysisManager.createTableLevelTaskForExternalTable(jobInfo, 
analysisTaskInfos, false);
+            analysisManager.createTableLevelTaskForExternalTable(jobInfo, 
analysisTasks, false);
         }
-        Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, 
analysisTaskInfos);
-        analysisTaskInfos.values().forEach(analysisTaskExecutor::submitTask);
+        Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, 
analysisTasks);
+        analysisTasks.values().forEach(analysisTaskExecutor::submitTask);
     }
 
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsPeriodCollector.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsPeriodCollector.java
deleted file mode 100644
index f34ad0f1221..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsPeriodCollector.java
+++ /dev/null
@@ -1,50 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.statistics;
-
-import org.apache.doris.catalog.Env;
-import org.apache.doris.common.Config;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-public class StatisticsPeriodCollector extends StatisticsCollector {
-    private static final Logger LOG = 
LogManager.getLogger(StatisticsPeriodCollector.class);
-
-    public StatisticsPeriodCollector() {
-        super("Automatic Analyzer",
-                
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes) / 2,
-                new 
AnalysisTaskExecutor(Config.period_analyze_simultaneously_running_task_num));
-    }
-
-    @Override
-    protected void collect() {
-        try {
-            AnalysisManager analysisManager = 
Env.getCurrentEnv().getAnalysisManager();
-            List<AnalysisInfo> jobInfos = analysisManager.findPeriodicJobs();
-            for (AnalysisInfo jobInfo : jobInfos) {
-                createSystemAnalysisJob(jobInfo);
-            }
-        } catch (Exception e) {
-            LOG.warn("Failed to periodically analyze the statistics." + e);
-        }
-    }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsId.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsId.java
index 3f9b2641b75..7cd8817a1a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsId.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsId.java
@@ -19,6 +19,8 @@ package org.apache.doris.statistics;
 
 import org.apache.doris.statistics.util.StatisticsUtil;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.util.StringJoiner;
 
 public class StatsId {
@@ -34,6 +36,17 @@ public class StatsId {
     // nullable
     public final String partId;
 
+    @VisibleForTesting
+    public StatsId() {
+        this.id = null;
+        this.catalogId = -1;
+        this.dbId = -1;
+        this.tblId = -1;
+        this.idxId = -1;
+        this.colId = null;
+        this.partId = null;
+    }
+
     public StatsId(ResultRow row) {
         this.id = row.get(0);
         this.catalogId = Long.parseLong(row.get(1));
@@ -52,7 +65,7 @@ public class StatsId {
         sj.add(String.valueOf(tblId));
         sj.add(String.valueOf(idxId));
         sj.add(StatisticsUtil.quote(colId));
-        sj.add(StatisticsUtil.quote(partId));
+        sj.add(partId);
         return sj.toString();
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index cc0fb334a39..931f22d7b02 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -177,12 +177,14 @@ public class StatisticsUtil {
         sessionVariable.enablePageCache = false;
         sessionVariable.parallelExecInstanceNum = 
Config.statistics_sql_parallel_exec_instance_num;
         sessionVariable.parallelPipelineTaskNum = 
Config.statistics_sql_parallel_exec_instance_num;
-        sessionVariable.setEnableNereidsPlanner(false);
+        sessionVariable.setEnableNereidsPlanner(true);
+        sessionVariable.setEnablePipelineEngine(false);
         sessionVariable.enableProfile = false;
         sessionVariable.enableScanRunSerial = limitScan;
-        sessionVariable.queryTimeoutS = Config.analyze_task_timeout_in_hours * 
60 * 60;
-        sessionVariable.insertTimeoutS = Config.analyze_task_timeout_in_hours 
* 60 * 60;
+        sessionVariable.queryTimeoutS = StatisticsUtil.getAnalyzeTimeout();
+        sessionVariable.insertTimeoutS = StatisticsUtil.getAnalyzeTimeout();
         sessionVariable.enableFileCache = false;
+        sessionVariable.forbidUnknownColStats = false;
         connectContext.setEnv(Env.getCurrentEnv());
         connectContext.setDatabase(FeConstants.INTERNAL_DB_NAME);
         connectContext.setQualifiedUser(UserIdentity.ROOT.getQualifiedUser());
@@ -807,7 +809,7 @@ public class StatisticsUtil {
 
     public static boolean inAnalyzeTime(LocalTime now) {
         try {
-            Pair<LocalTime, LocalTime> range = findRangeFromGlobalSessionVar();
+            Pair<LocalTime, LocalTime> range = 
findConfigFromGlobalSessionVar();
             if (range == null) {
                 return false;
             }
@@ -824,16 +826,16 @@ public class StatisticsUtil {
         }
     }
 
-    private static Pair<LocalTime, LocalTime> findRangeFromGlobalSessionVar() {
+    private static Pair<LocalTime, LocalTime> findConfigFromGlobalSessionVar() 
{
         try {
             String startTime =
-                    
findRangeFromGlobalSessionVar(SessionVariable.FULL_AUTO_ANALYZE_START_TIME)
+                    
findConfigFromGlobalSessionVar(SessionVariable.FULL_AUTO_ANALYZE_START_TIME)
                             .fullAutoAnalyzeStartTime;
             // For compatibility
             if (StringUtils.isEmpty(startTime)) {
                 startTime = StatisticConstants.FULL_AUTO_ANALYZE_START_TIME;
             }
-            String endTime = 
findRangeFromGlobalSessionVar(SessionVariable.FULL_AUTO_ANALYZE_END_TIME)
+            String endTime = 
findConfigFromGlobalSessionVar(SessionVariable.FULL_AUTO_ANALYZE_END_TIME)
                     .fullAutoAnalyzeEndTime;
             if (StringUtils.isEmpty(startTime)) {
                 endTime = StatisticConstants.FULL_AUTO_ANALYZE_END_TIME;
@@ -845,7 +847,7 @@ public class StatisticsUtil {
         }
     }
 
-    private static SessionVariable findRangeFromGlobalSessionVar(String 
varName) throws Exception {
+    protected static SessionVariable findConfigFromGlobalSessionVar(String 
varName) throws Exception {
         SessionVariable sessionVariable =  VariableMgr.newSessionVariable();
         VariableExpr variableExpr = new VariableExpr(varName, SetType.GLOBAL);
         VariableMgr.getValue(sessionVariable, variableExpr);
@@ -854,10 +856,71 @@ public class StatisticsUtil {
 
     public static boolean enableAutoAnalyze() {
         try {
-            return 
findRangeFromGlobalSessionVar(SessionVariable.ENABLE_FULL_AUTO_ANALYZE).enableFullAutoAnalyze;
+            return 
findConfigFromGlobalSessionVar(SessionVariable.ENABLE_FULL_AUTO_ANALYZE).enableFullAutoAnalyze;
         } catch (Exception e) {
             LOG.warn("Fail to get value of enable auto analyze, return false 
by default", e);
         }
         return false;
     }
+
+    public static int getInsertMergeCount() {
+        try {
+            return 
findConfigFromGlobalSessionVar(SessionVariable.STATS_INSERT_MERGE_ITEM_COUNT)
+                    .statsInsertMergeItemCount;
+        } catch (Exception e) {
+            LOG.warn("Failed to get value of insert_merge_item_count, return 
default", e);
+        }
+        return StatisticConstants.INSERT_MERGE_ITEM_COUNT;
+    }
+
+    public static long getHugeTableSampleRows() {
+        try {
+            return 
findConfigFromGlobalSessionVar(SessionVariable.HUGE_TABLE_DEFAULT_SAMPLE_ROWS)
+                    .hugeTableDefaultSampleRows;
+        } catch (Exception e) {
+            LOG.warn("Failed to get value of huge_table_default_sample_rows, 
return default", e);
+        }
+        return StatisticConstants.HUGE_TABLE_DEFAULT_SAMPLE_ROWS;
+    }
+
+    public static long getHugeTableLowerBoundSizeInBytes() {
+        try {
+            return 
findConfigFromGlobalSessionVar(SessionVariable.HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES)
+                    .hugeTableLowerBoundSizeInBytes;
+        } catch (Exception e) {
+            LOG.warn("Failed to get value of 
huge_table_lower_bound_size_in_bytes, return default", e);
+        }
+        return StatisticConstants.HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES;
+    }
+
+    public static long getHugeTableAutoAnalyzeIntervalInMillis() {
+        try {
+            return 
findConfigFromGlobalSessionVar(SessionVariable.HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS)
+                    .hugeTableAutoAnalyzeIntervalInMillis;
+        } catch (Exception e) {
+            LOG.warn("Failed to get value of 
huge_table_auto_analyze_interval_in_millis, return default", e);
+        }
+        return StatisticConstants.HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS;
+    }
+
+    public static long getTableStatsHealthThreshold() {
+        try {
+            return 
findConfigFromGlobalSessionVar(SessionVariable.TABLE_STATS_HEALTH_THRESHOLD)
+                    .tableStatsHealthThreshold;
+        } catch (Exception e) {
+            LOG.warn("Failed to get value of table_stats_health_threshold, 
return default", e);
+        }
+        return StatisticConstants.TABLE_STATS_HEALTH_THRESHOLD;
+    }
+
+    public static int getAnalyzeTimeout() {
+        try {
+            return 
findConfigFromGlobalSessionVar(SessionVariable.ANALYZE_TIMEOUT)
+                    .analyzeTimeoutS;
+        } catch (Exception e) {
+            LOG.warn("Failed to get value of table_stats_health_threshold, 
return default", e);
+        }
+        return StatisticConstants.ANALYZE_TIMEOUT_IN_SEC;
+    }
+
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
index f01485f642f..d4dedd17123 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
@@ -17,25 +17,10 @@
 
 package org.apache.doris.statistics;
 
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.InternalSchemaInitializer;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.common.FeConstants;
-import org.apache.doris.datasource.InternalCatalog;
-import org.apache.doris.qe.AutoCloseConnectContext;
-import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.qe.StmtExecutor;
-import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
-import org.apache.doris.statistics.AnalysisInfo.AnalysisMode;
-import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
-import org.apache.doris.statistics.AnalysisInfo.JobType;
-import org.apache.doris.statistics.util.DBObjects;
 import org.apache.doris.statistics.util.StatisticsUtil;
-import org.apache.doris.utframe.TestWithFeService;
 
-import com.google.common.collect.Maps;
 import mockit.Expectations;
 import mockit.Mock;
 import mockit.MockUp;
@@ -44,136 +29,196 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class AnalysisJobTest extends TestWithFeService {
-
-    @Override
-    protected void runBeforeAll() throws Exception {
-        try {
-            InternalSchemaInitializer.createDB();
-            createDatabase("analysis_job_test");
-            connectContext.setDatabase("default_cluster:analysis_job_test");
-            createTable("CREATE TABLE t1 (col1 int not null, col2 int not 
null, col3 int not null)\n"
-                    + "DISTRIBUTED BY HASH(col3)\n" + "BUCKETS 1\n"
-                    + "PROPERTIES(\n" + "    \"replication_num\"=\"1\"\n"
-                    + ");");
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        FeConstants.runningUnitTest = true;
-    }
+import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class AnalysisJobTest {
 
+    // make user task has been set corresponding job
     @Test
-    public void testCreateAnalysisJob() throws Exception {
+    public void initTest(@Mocked AnalysisInfo jobInfo, @Mocked 
OlapAnalysisTask task) {
+        AnalysisJob analysisJob = new AnalysisJob(jobInfo, 
Arrays.asList(task));
+        Assertions.assertSame(task.job, analysisJob);
+    }
 
-        new MockUp<StatisticsUtil>() {
+    @Test
+    public void testAppendBufTest1(@Mocked AnalysisInfo analysisInfo, @Mocked 
OlapAnalysisTask olapAnalysisTask) {
+        AtomicInteger writeBufInvokeTimes = new AtomicInteger();
+        new MockUp<AnalysisJob>() {
+            @Mock
+            protected void writeBuf() {
+                writeBufInvokeTimes.incrementAndGet();
+            }
 
             @Mock
-            public AutoCloseConnectContext buildConnectContext() {
-                return new AutoCloseConnectContext(connectContext);
+            public void updateTaskState(AnalysisState state, String msg) {
             }
 
             @Mock
-            public void execUpdate(String sql) throws Exception {
+            public void deregisterJob() {
             }
         };
+        AnalysisJob job = new AnalysisJob(analysisInfo, 
Arrays.asList(olapAnalysisTask));
+        job.queryingTask = new HashSet<>();
+        job.queryingTask.add(olapAnalysisTask);
+        job.queryFinished = new HashSet<>();
+        job.buf = new ArrayList<>();
+        job.totalTaskCount = 20;
+
+        // not all task finished nor cached limit exceed, shouldn't  write
+        job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData()));
+        Assertions.assertEquals(0, writeBufInvokeTimes.get());
+    }
 
-        new MockUp<StmtExecutor>() {
+    @Test
+    public void testAppendBufTest2(@Mocked AnalysisInfo analysisInfo, @Mocked 
OlapAnalysisTask olapAnalysisTask) {
+        AtomicInteger writeBufInvokeTimes = new AtomicInteger();
+        AtomicInteger deregisterTimes = new AtomicInteger();
+
+        new MockUp<AnalysisJob>() {
             @Mock
-            public List<ResultRow> executeInternalQuery() {
-                return Collections.emptyList();
+            protected void writeBuf() {
+                writeBufInvokeTimes.incrementAndGet();
             }
-        };
 
-        new MockUp<ConnectContext>() {
+            @Mock
+            public void updateTaskState(AnalysisState state, String msg) {
+            }
 
             @Mock
-            public ConnectContext get() {
-                return connectContext;
+            public void deregisterJob() {
+                deregisterTimes.getAndIncrement();
             }
         };
-        String sql = "ANALYZE TABLE t1";
-        Assertions.assertNotNull(getSqlStmtExecutor(sql));
+        AnalysisJob job = new AnalysisJob(analysisInfo, 
Arrays.asList(olapAnalysisTask));
+        job.queryingTask = new HashSet<>();
+        job.queryingTask.add(olapAnalysisTask);
+        job.queryFinished = new HashSet<>();
+        job.buf = new ArrayList<>();
+        job.totalTaskCount = 1;
+
+        job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData()));
+        // all task finished, should write and deregister this job
+        Assertions.assertEquals(1, writeBufInvokeTimes.get());
+        Assertions.assertEquals(1, deregisterTimes.get());
     }
 
     @Test
-    public void testJobExecution(@Mocked StmtExecutor stmtExecutor, @Mocked 
InternalCatalog catalog, @Mocked
-            Database database,
-            @Mocked OlapTable olapTable)
-            throws Exception {
-        new MockUp<OlapTable>() {
+    public void testAppendBufTest3(@Mocked AnalysisInfo analysisInfo, @Mocked 
OlapAnalysisTask olapAnalysisTask) {
+        AtomicInteger writeBufInvokeTimes = new AtomicInteger();
 
+        new MockUp<AnalysisJob>() {
             @Mock
-            public Column getColumn(String name) {
-                return new Column("col1", PrimitiveType.INT);
+            protected void writeBuf() {
+                writeBufInvokeTimes.incrementAndGet();
             }
-        };
-
-        new MockUp<StatisticsUtil>() {
 
             @Mock
-            public ConnectContext buildConnectContext() {
-                return connectContext;
+            public void updateTaskState(AnalysisState state, String msg) {
             }
 
             @Mock
-            public void execUpdate(String sql) throws Exception {
+            public void deregisterJob() {
             }
+        };
+        AnalysisJob job = new AnalysisJob(analysisInfo, 
Arrays.asList(olapAnalysisTask));
+        job.queryingTask = new HashSet<>();
+        job.queryingTask.add(olapAnalysisTask);
+        job.queryFinished = new HashSet<>();
+        job.buf = new ArrayList<>();
+        ColStatsData colStatsData = new ColStatsData();
+        for (int i = 0; i < StatisticsUtil.getInsertMergeCount(); i++) {
+            job.buf.add(colStatsData);
+        }
+        job.totalTaskCount = 100;
+
+        job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData()));
+        // cache limit exceed, should write them
+        Assertions.assertEquals(1, writeBufInvokeTimes.get());
+    }
 
+    @Test
+    public void testUpdateTaskState(
+            @Mocked AnalysisInfo info,
+            @Mocked OlapAnalysisTask task1,
+            @Mocked OlapAnalysisTask task2) {
+        AtomicInteger updateTaskStatusInvokeTimes = new AtomicInteger();
+        new MockUp<AnalysisManager>() {
             @Mock
-            public DBObjects convertIdToObjects(long catalogId, long dbId, 
long tblId) {
-                return new DBObjects(catalog, database, olapTable);
+            public void updateTaskStatus(AnalysisInfo info, AnalysisState 
taskState, String message, long time) {
+                updateTaskStatusInvokeTimes.getAndIncrement();
             }
         };
-        new MockUp<StatisticsCache>() {
-
+        AnalysisManager analysisManager = new AnalysisManager();
+        new MockUp<Env>() {
             @Mock
-            public void syncLoadColStats(long tableId, long idxId, String 
colName) {
+            public AnalysisManager getAnalysisManager() {
+                return analysisManager;
             }
         };
-        new MockUp<StmtExecutor>() {
+        AnalysisJob job = new AnalysisJob(info, 
Collections.singletonList(task1));
+        job.queryFinished = new HashSet<>();
+        job.queryFinished.add(task2);
+        job.updateTaskState(AnalysisState.FAILED, "");
+        Assertions.assertEquals(2, updateTaskStatusInvokeTimes.get());
+    }
 
+    @Test
+    public void testWriteBuf1(@Mocked AnalysisInfo info,
+            @Mocked OlapAnalysisTask task1, @Mocked OlapAnalysisTask task2) {
+        AnalysisJob job = new AnalysisJob(info, 
Collections.singletonList(task1));
+        job.queryFinished = new HashSet<>();
+        job.queryFinished.add(task2);
+        new MockUp<AnalysisJob>() {
             @Mock
-            public void execute() throws Exception {
-
+            public void updateTaskState(AnalysisState state, String msg) {
             }
 
             @Mock
-            public List<ResultRow> executeInternalQuery() {
-                return new ArrayList<>();
-            }
-        };
+            protected void executeWithExceptionOnFail(StmtExecutor 
stmtExecutor) throws Exception {
 
-        new MockUp<OlapAnalysisTask>() {
+            }
 
             @Mock
-            public void execSQLs(List<String> partitionAnalysisSQLs, 
Map<String, String> params) throws Exception {}
+            protected void syncLoadStats() {
+            }
         };
-        HashMap<String, Set<String>> colToPartitions = Maps.newHashMap();
-        colToPartitions.put("col1", Collections.singleton("t1"));
-        AnalysisInfo analysisJobInfo = new 
AnalysisInfoBuilder().setJobId(0).setTaskId(0)
-                .setCatalogId(0)
-                .setDBId(0)
-                .setTblId(0)
-                .setColName("col1").setJobType(JobType.MANUAL)
-                .setAnalysisMode(AnalysisMode.FULL)
-                .setAnalysisMethod(AnalysisMethod.FULL)
-                .setAnalysisType(AnalysisType.FUNDAMENTALS)
-                .setColToPartitions(colToPartitions)
-                .setState(AnalysisState.RUNNING)
-                .build();
-        new OlapAnalysisTask(analysisJobInfo).doExecute();
         new Expectations() {
             {
-                stmtExecutor.execute();
+                job.syncLoadStats();
                 times = 1;
             }
         };
+        job.writeBuf();
+
+        Assertions.assertEquals(0, job.queryFinished.size());
+    }
+
+    @Test
+    public void testWriteBuf2(@Mocked AnalysisInfo info,
+            @Mocked OlapAnalysisTask task1, @Mocked OlapAnalysisTask task2) {
+        new MockUp<AnalysisJob>() {
+            @Mock
+            public void updateTaskState(AnalysisState state, String msg) {
+            }
+
+            @Mock
+            protected void executeWithExceptionOnFail(StmtExecutor 
stmtExecutor) throws Exception {
+                throw new RuntimeException();
+            }
+
+            @Mock
+            protected void syncLoadStats() {
+            }
+        };
+        AnalysisJob job = new AnalysisJob(info, 
Collections.singletonList(task1));
+        job.buf.add(new ColStatsData());
+        job.queryFinished = new HashSet<>();
+        job.queryFinished.add(task2);
+        job.writeBuf();
+        Assertions.assertEquals(1, job.queryFinished.size());
     }
 
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
index c995710da44..6372ce97d6e 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
@@ -24,6 +24,7 @@ import org.apache.doris.analysis.TableName;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
 import org.apache.doris.statistics.AnalysisInfo.JobType;
@@ -340,7 +341,7 @@ public class AnalysisManagerTest {
         };
         OlapTable olapTable = new OlapTable();
         TableStatsMeta stats1 = new TableStatsMeta(0, 50, new 
AnalysisInfoBuilder().setColName("col1").build());
-        stats1.updatedRows.addAndGet(30);
+        stats1.updatedRows.addAndGet(50);
 
         Assertions.assertTrue(olapTable.needReAnalyzeTable(stats1));
         TableStatsMeta stats2 = new TableStatsMeta(0, 190, new 
AnalysisInfoBuilder().setColName("col1").build());
@@ -349,4 +350,38 @@ public class AnalysisManagerTest {
 
     }
 
+    @Test
+    public void testRecordLimit1() {
+        Config.analyze_record_limit = 2;
+        AnalysisManager analysisManager = new AnalysisManager();
+        analysisManager.replayCreateAnalysisJob(new 
AnalysisInfoBuilder().setJobId(1).build());
+        analysisManager.replayCreateAnalysisJob(new 
AnalysisInfoBuilder().setJobId(2).build());
+        analysisManager.replayCreateAnalysisJob(new 
AnalysisInfoBuilder().setJobId(3).build());
+        Assertions.assertEquals(2, analysisManager.analysisJobInfoMap.size());
+        
Assertions.assertTrue(analysisManager.analysisJobInfoMap.containsKey(2L));
+        
Assertions.assertTrue(analysisManager.analysisJobInfoMap.containsKey(3L));
+    }
+
+    @Test
+    public void testRecordLimit2() {
+        Config.analyze_record_limit = 2;
+        AnalysisManager analysisManager = new AnalysisManager();
+        analysisManager.replayCreateAnalysisTask(new 
AnalysisInfoBuilder().setTaskId(1).build());
+        analysisManager.replayCreateAnalysisTask(new 
AnalysisInfoBuilder().setTaskId(2).build());
+        analysisManager.replayCreateAnalysisTask(new 
AnalysisInfoBuilder().setTaskId(3).build());
+        Assertions.assertEquals(2, analysisManager.analysisTaskInfoMap.size());
+        
Assertions.assertTrue(analysisManager.analysisTaskInfoMap.containsKey(2L));
+        
Assertions.assertTrue(analysisManager.analysisTaskInfoMap.containsKey(3L));
+    }
+
+    @Test
+    public void testRecordLimit3() {
+        Config.analyze_record_limit = 2;
+        AnalysisManager analysisManager = new AnalysisManager();
+        analysisManager.autoJobs.offer(new 
AnalysisInfoBuilder().setJobId(1).build());
+        analysisManager.autoJobs.offer(new 
AnalysisInfoBuilder().setJobId(2).build());
+        analysisManager.autoJobs.offer(new 
AnalysisInfoBuilder().setJobId(3).build());
+        Assertions.assertEquals(2, analysisManager.autoJobs.size());
+    }
+
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
index 19d7798041a..8cfcfeabd28 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
@@ -37,6 +37,7 @@ import com.google.common.collect.Maps;
 import mockit.Mock;
 import mockit.MockUp;
 import mockit.Mocked;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
@@ -45,6 +46,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class AnalysisTaskExecutorTest extends TestWithFeService {
 
@@ -82,6 +84,15 @@ public class AnalysisTaskExecutorTest extends 
TestWithFeService {
                 return new Column("col1", PrimitiveType.INT);
             }
         };
+        final AtomicBoolean cancelled = new AtomicBoolean();
+        new MockUp<AnalysisTaskWrapper>() {
+
+            @Mock
+            public boolean cancel(String msg) {
+                cancelled.set(true);
+                return true;
+            }
+        };
         AnalysisInfo analysisJobInfo = new 
AnalysisInfoBuilder().setJobId(0).setTaskId(0)
                 .setCatalogId(0)
                 .setDBId(0)
@@ -98,7 +109,10 @@ public class AnalysisTaskExecutorTest extends 
TestWithFeService {
         AnalysisTaskWrapper analysisTaskWrapper = new 
AnalysisTaskWrapper(analysisTaskExecutor, analysisJob);
         Deencapsulation.setField(analysisTaskWrapper, "startTime", 5);
         b.put(analysisTaskWrapper);
-        analysisTaskExecutor.start();
+        analysisTaskExecutor.tryToCancel();
+        Assertions.assertTrue(cancelled.get());
+        Assertions.assertTrue(b.isEmpty());
+
     }
 
     @Test
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java
similarity index 97%
copy from 
fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
copy to fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java
index f01485f642f..268540885da 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java
@@ -50,7 +50,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-public class AnalysisJobTest extends TestWithFeService {
+public class AnalyzeTest extends TestWithFeService {
 
     @Override
     protected void runBeforeAll() throws Exception {
@@ -154,6 +154,12 @@ public class AnalysisJobTest extends TestWithFeService {
             @Mock
             public void execSQLs(List<String> partitionAnalysisSQLs, 
Map<String, String> params) throws Exception {}
         };
+
+        new MockUp<BaseAnalysisTask>() {
+
+            @Mock
+            protected void runQuery(String sql) {}
+        };
         HashMap<String, Set<String>> colToPartitions = Maps.newHashMap();
         colToPartitions.put("col1", Collections.singleton("t1"));
         AnalysisInfo analysisJobInfo = new 
AnalysisInfoBuilder().setJobId(0).setTaskId(0)
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java
index f5b98a47ce0..95ed5023e36 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.catalog.external.HMSExternalDatabase;
 import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.datasource.CatalogMgr;
 import org.apache.doris.datasource.HMSExternalCatalog;
 import org.apache.doris.ha.FrontendNodeType;
@@ -31,6 +32,9 @@ import org.apache.doris.system.Frontend;
 import org.apache.doris.thrift.TUpdateFollowerStatsCacheRequest;
 import org.apache.doris.utframe.TestWithFeService;
 
+import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.collect.Lists;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
@@ -40,9 +44,11 @@ import mockit.Expectations;
 import mockit.Mock;
 import mockit.MockUp;
 import mockit.Mocked;
+import org.checkerframework.checker.nullness.qual.NonNull;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -50,6 +56,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
 
 public class CacheTest extends TestWithFeService {
 
@@ -350,4 +357,29 @@ public class CacheTest extends TestWithFeService {
             }
         };
     }
+
+    @Test
+    public void testEvict() {
+        ThreadPoolExecutor threadPool
+                = ThreadPoolManager.newDaemonFixedThreadPool(
+                1, Integer.MAX_VALUE, "STATS_FETCH", true);
+        AsyncLoadingCache<Integer, Integer> columnStatisticsCache =
+                Caffeine.newBuilder()
+                        .maximumSize(1)
+                        
.refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL))
+                        .executor(threadPool)
+                        .buildAsync(new AsyncCacheLoader<Integer, Integer>() {
+                            @Override
+                            public @NonNull CompletableFuture<Integer> 
asyncLoad(@NonNull Integer integer,
+                                    @NonNull Executor executor) {
+                                return CompletableFuture.supplyAsync(() -> {
+                                    return integer;
+                                }, threadPool);
+                            }
+                        });
+        columnStatisticsCache.get(1);
+        columnStatisticsCache.get(2);
+        
Assertions.assertTrue(columnStatisticsCache.synchronous().asMap().containsKey(2));
+        Assertions.assertEquals(1, 
columnStatisticsCache.synchronous().asMap().size());
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java
index d618a5fa538..f2b9f84f0d0 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java
@@ -19,47 +19,36 @@ package org.apache.doris.statistics;
 
 import org.apache.doris.analysis.TableSample;
 import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.Config;
 import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
+import org.apache.doris.statistics.AnalysisInfo.JobType;
+import org.apache.doris.statistics.util.StatisticsUtil;
 
-import mockit.Expectations;
+import mockit.Mock;
+import mockit.MockUp;
 import mockit.Mocked;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 public class OlapAnalysisTaskTest {
 
+    // test manual
     @Test
-    public void testAutoSample(@Mocked CatalogIf catalogIf, @Mocked DatabaseIf 
databaseIf, @Mocked TableIf tableIf) {
-        new Expectations() {
-            {
-                tableIf.getDataSize(true);
-                result = 60_0000_0000L;
-            }
-        };
+    public void testSample1(@Mocked CatalogIf catalogIf, @Mocked DatabaseIf 
databaseIf, @Mocked TableIf tableIf) {
 
         AnalysisInfoBuilder analysisInfoBuilder = new AnalysisInfoBuilder()
                 .setAnalysisMethod(AnalysisMethod.FULL);
+        analysisInfoBuilder.setJobType(JobType.MANUAL);
         OlapAnalysisTask olapAnalysisTask = new OlapAnalysisTask();
         olapAnalysisTask.info = analysisInfoBuilder.build();
         olapAnalysisTask.tbl = tableIf;
-        Config.enable_auto_sample = true;
         TableSample tableSample = olapAnalysisTask.getTableSample();
-        Assertions.assertEquals(4194304, tableSample.getSampleValue());
-        Assertions.assertFalse(tableSample.isPercent());
-
-        new Expectations() {
-            {
-                tableIf.getDataSize(true);
-                result = 1_0000_0000L;
-            }
-        };
-        tableSample = olapAnalysisTask.getTableSample();
         Assertions.assertNull(tableSample);
 
         analysisInfoBuilder.setSampleRows(10);
+        analysisInfoBuilder.setJobType(JobType.MANUAL);
         analysisInfoBuilder.setAnalysisMethod(AnalysisMethod.SAMPLE);
         olapAnalysisTask.info = analysisInfoBuilder.build();
         tableSample = olapAnalysisTask.getTableSample();
@@ -67,4 +56,49 @@ public class OlapAnalysisTaskTest {
         Assertions.assertFalse(tableSample.isPercent());
     }
 
+    // test auto big table
+    @Test
+    public void testSample2(@Mocked OlapTable tbl) {
+        new MockUp<OlapTable>() {
+
+            @Mock
+            public long getDataSize(boolean singleReplica) {
+                return 1000_0000_0000L;
+            }
+        };
+
+        AnalysisInfoBuilder analysisInfoBuilder = new AnalysisInfoBuilder()
+                .setAnalysisMethod(AnalysisMethod.FULL);
+        analysisInfoBuilder.setJobType(JobType.SYSTEM);
+        OlapAnalysisTask olapAnalysisTask = new OlapAnalysisTask();
+        olapAnalysisTask.info = analysisInfoBuilder.build();
+        olapAnalysisTask.tbl = tbl;
+        TableSample tableSample = olapAnalysisTask.getTableSample();
+        Assertions.assertNotNull(tableSample);
+        Assertions.assertEquals(StatisticsUtil.getHugeTableSampleRows(), 
tableSample.getSampleValue());
+
+    }
+
+    // test auto small table
+    @Test
+    public void testSample3(@Mocked OlapTable tbl) {
+        new MockUp<OlapTable>() {
+
+            @Mock
+            public long getDataSize(boolean singleReplica) {
+                return 1000;
+            }
+        };
+
+        AnalysisInfoBuilder analysisInfoBuilder = new AnalysisInfoBuilder()
+                .setAnalysisMethod(AnalysisMethod.FULL);
+        analysisInfoBuilder.setJobType(JobType.SYSTEM);
+        OlapAnalysisTask olapAnalysisTask = new OlapAnalysisTask();
+        olapAnalysisTask.info = analysisInfoBuilder.build();
+        olapAnalysisTask.tbl = tbl;
+        TableSample tableSample = olapAnalysisTask.getTableSample();
+        Assertions.assertNull(tableSample);
+
+    }
+
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
new file mode 100644
index 00000000000..d441ce5b09d
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
@@ -0,0 +1,289 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.catalog.View;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
+import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
+import org.apache.doris.statistics.AnalysisInfo.JobType;
+import org.apache.doris.statistics.util.StatisticsUtil;
+import org.apache.doris.system.SystemInfoService;
+
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import org.apache.hadoop.util.Lists;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class StatisticsAutoCollectorTest {
+
+    @Test
+    public void testAnalyzeAll(@Injectable AnalysisInfo analysisInfo) {
+        new MockUp<CatalogIf>() {
+            @Mock
+            public Collection<DatabaseIf> getAllDbs() {
+                Database db1 = new Database(1, 
SystemInfoService.DEFAULT_CLUSTER
+                        + ClusterNamespace.CLUSTER_DELIMITER + 
FeConstants.INTERNAL_DB_NAME);
+                Database db2 = new Database(2, "anyDB");
+                List<DatabaseIf> databaseIfs = new ArrayList<>();
+                databaseIfs.add(db1);
+                databaseIfs.add(db2);
+                return databaseIfs;
+            }
+        };
+        new MockUp<StatisticsAutoCollector>() {
+            @Mock
+            public List<AnalysisInfo> 
constructAnalysisInfo(DatabaseIf<TableIf> db) {
+                return Arrays.asList(analysisInfo, analysisInfo);
+            }
+
+            int count = 0;
+
+            @Mock
+            public AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) 
{
+                return count++ == 0 ? null : jobInfo;
+            }
+
+            @Mock
+            public void createSystemAnalysisJob(AnalysisInfo jobInfo)
+                    throws DdlException {
+
+            }
+        };
+
+        StatisticsAutoCollector saa = new StatisticsAutoCollector();
+        saa.runAfterCatalogReady();
+        new Expectations() {
+            {
+                try {
+                    saa.createSystemAnalysisJob((AnalysisInfo) any);
+                    times = 1;
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        };
+    }
+
+    @Test
+    public void testConstructAnalysisInfo(
+            @Injectable OlapTable o2, @Injectable View v) {
+        new MockUp<Database>() {
+            @Mock
+            public List<Table> getTables() {
+                List<Table> tableIfs = new ArrayList<>();
+                tableIfs.add(o2);
+                tableIfs.add(v);
+                return tableIfs;
+            }
+
+            @Mock
+            public String getFullName() {
+                return "anyDb";
+            }
+        };
+
+        new MockUp<OlapTable>() {
+            @Mock
+            public String getName() {
+                return "anytable";
+            }
+
+            @Mock
+            public List<Column> getBaseSchema() {
+                List<Column> columns = new ArrayList<>();
+                columns.add(new Column("c1", PrimitiveType.INT));
+                columns.add(new Column("c2", PrimitiveType.HLL));
+                return columns;
+            }
+        };
+        StatisticsAutoCollector saa = new StatisticsAutoCollector();
+        List<AnalysisInfo> analysisInfos =
+                saa.constructAnalysisInfo(new Database(1, "anydb"));
+        Assertions.assertEquals(1, analysisInfos.size());
+        Assertions.assertEquals("c1", 
analysisInfos.get(0).colName.split(",")[0]);
+    }
+
+    @Test
+    public void testGetReAnalyzeRequiredPart0() {
+
+        TableIf tableIf = new OlapTable();
+
+        new MockUp<OlapTable>() {
+            @Mock
+            protected Map<String, Set<String>> findReAnalyzeNeededPartitions() 
{
+                Set<String> partitionNames = new HashSet<>();
+                partitionNames.add("p1");
+                partitionNames.add("p2");
+                Map<String, Set<String>> map = new HashMap<>();
+                map.put("col1", partitionNames);
+                return map;
+            }
+
+            @Mock
+            public long getRowCount() {
+                return 100;
+            }
+
+            @Mock
+            public List<Column> getBaseSchema() {
+                return Lists.newArrayList(new Column("col1", Type.INT), new 
Column("col2", Type.INT));
+            }
+        };
+
+        new MockUp<StatisticsUtil>() {
+            @Mock
+            public TableIf findTable(long catalogName, long dbName, long 
tblName) {
+                return tableIf;
+            }
+        };
+        AnalysisInfo analysisInfo = new 
AnalysisInfoBuilder().setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(
+                
AnalysisType.FUNDAMENTALS).setColName("col1").setJobType(JobType.SYSTEM).build();
+        new MockUp<AnalysisManager>() {
+
+            int count = 0;
+
+            TableStatsMeta[] tableStatsArr =
+                    new TableStatsMeta[] {new TableStatsMeta(0, 0, 
analysisInfo),
+                            new TableStatsMeta(0, 0, analysisInfo), null};
+
+            {
+                tableStatsArr[0].updatedRows.addAndGet(100);
+                tableStatsArr[1].updatedRows.addAndGet(0);
+            }
+
+            @Mock
+            public TableStatsMeta findTableStatsStatus(long tblId) {
+                return tableStatsArr[count++];
+            }
+        };
+
+        new MockUp<StatisticsAutoCollector>() {
+            @Mock
+            public AnalysisInfo getAnalysisJobInfo(AnalysisInfo jobInfo, 
TableIf table,
+                    Set<String> needRunPartitions) {
+                return new AnalysisInfoBuilder().build();
+            }
+        };
+        StatisticsAutoCollector statisticsAutoCollector = new 
StatisticsAutoCollector();
+        AnalysisInfo analysisInfo2 = new AnalysisInfoBuilder()
+                .setCatalogId(0)
+                .setDBId(0)
+                .setTblId(0).build();
+        
Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2));
+        // uncomment it when updatedRows gets ready
+        // 
Assertions.assertNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2));
+        
Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2));
+    }
+
+    @Test
+    public void testLoop() {
+        AtomicBoolean timeChecked = new AtomicBoolean();
+        AtomicBoolean switchChecked = new AtomicBoolean();
+        new MockUp<StatisticsUtil>() {
+
+            @Mock
+            public boolean inAnalyzeTime(LocalTime now) {
+                timeChecked.set(true);
+                return true;
+            }
+
+            @Mock
+            public boolean enableAutoAnalyze() {
+                switchChecked.set(true);
+                return true;
+            }
+        };
+        StatisticsAutoCollector autoCollector = new StatisticsAutoCollector();
+        autoCollector.collect();
+        Assertions.assertTrue(timeChecked.get() && switchChecked.get());
+
+    }
+
+    @Test
+    public void checkAvailableThread() {
+        StatisticsAutoCollector autoCollector = new StatisticsAutoCollector();
+        
Assertions.assertEquals(Config.full_auto_analyze_simultaneously_running_task_num,
+                
autoCollector.analysisTaskExecutor.executors.getMaximumPoolSize());
+    }
+
+    @Test
+    public void testSkip(@Mocked OlapTable olapTable, @Mocked TableStatsMeta 
stats, @Mocked TableIf anyOtherTable) {
+        new MockUp<OlapTable>() {
+
+            @Mock
+            public long getDataSize(boolean singleReplica) {
+                return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5 
+ 1000000000;
+            }
+        };
+
+        new MockUp<AnalysisManager>() {
+
+            @Mock
+            public TableStatsMeta findTableStatsStatus(long tblId) {
+                return stats;
+            }
+        };
+        // A very huge table has been updated recently, so we should skip it 
this time
+        stats.updatedTime = System.currentTimeMillis() - 1000;
+        StatisticsAutoCollector autoCollector = new StatisticsAutoCollector();
+        Assertions.assertTrue(autoCollector.skip(olapTable));
+        // The update of this huge table is long time ago, so we shouldn't 
skip it this time
+        stats.updatedTime = System.currentTimeMillis()
+                - StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis() - 
10000;
+        Assertions.assertFalse(autoCollector.skip(olapTable));
+        new MockUp<AnalysisManager>() {
+
+            @Mock
+            public TableStatsMeta findTableStatsStatus(long tblId) {
+                return null;
+            }
+        };
+        // can't find table stats meta, which means this table never get 
analyzed,  so we shouldn't skip it this time
+        Assertions.assertFalse(autoCollector.skip(olapTable));
+        // this is not olap table nor external table, so we should skip it 
this time
+        Assertions.assertTrue(autoCollector.skip(anyOtherTable));
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java
index c0d4a656d75..c0c790c9c25 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java
@@ -19,9 +19,15 @@ package org.apache.doris.statistics.util;
 
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.qe.SessionVariable;
 
-import org.junit.Test;
+import mockit.Mock;
+import mockit.MockUp;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
 
 public class StatisticsUtilTest {
     @Test
@@ -67,4 +73,42 @@ public class StatisticsUtilTest {
             Assertions.fail();
         }
     }
+
+    @Test
+    public void testInAnalyzeTime1() {
+        new MockUp<StatisticsUtil>() {
+
+            @Mock
+            protected SessionVariable findConfigFromGlobalSessionVar(String 
varName) throws Exception {
+                SessionVariable sessionVariable = new SessionVariable();
+                sessionVariable.fullAutoAnalyzeStartTime = "00:00:00";
+                sessionVariable.fullAutoAnalyzeEndTime = "02:00:00";
+                return sessionVariable;
+            }
+        };
+        DateTimeFormatter timeFormatter = 
DateTimeFormatter.ofPattern("HH:mm:ss");
+        String now = "01:00:00";
+        
Assertions.assertTrue(StatisticsUtil.inAnalyzeTime(LocalTime.parse(now, 
timeFormatter)));
+        now = "13:00:00";
+        
Assertions.assertFalse(StatisticsUtil.inAnalyzeTime(LocalTime.parse(now, 
timeFormatter)));
+    }
+
+    @Test
+    public void testInAnalyzeTime2() {
+        new MockUp<StatisticsUtil>() {
+
+            @Mock
+            protected SessionVariable findConfigFromGlobalSessionVar(String 
varName) throws Exception {
+                SessionVariable sessionVariable = new SessionVariable();
+                sessionVariable.fullAutoAnalyzeStartTime = "00:00:00";
+                sessionVariable.fullAutoAnalyzeEndTime = "23:00:00";
+                return sessionVariable;
+            }
+        };
+        DateTimeFormatter timeFormatter = 
DateTimeFormatter.ofPattern("HH:mm:ss");
+        String now = "15:00:00";
+        
Assertions.assertTrue(StatisticsUtil.inAnalyzeTime(LocalTime.parse(now, 
timeFormatter)));
+        now = "23:30:00";
+        
Assertions.assertFalse(StatisticsUtil.inAnalyzeTime(LocalTime.parse(now, 
timeFormatter)));
+    }
 }
diff --git a/regression-test/suites/statistics/analyze_stats.groovy 
b/regression-test/suites/statistics/analyze_stats.groovy
index 4e4c4a08425..3a33b672bfe 100644
--- a/regression-test/suites/statistics/analyze_stats.groovy
+++ b/regression-test/suites/statistics/analyze_stats.groovy
@@ -57,7 +57,11 @@ suite("test_analyze") {
             `analyzetestlimitedk8` double null comment "",
             `analyzetestlimitedk9` float null comment "",
             `analyzetestlimitedk12` string  null comment "",
-            `analyzetestlimitedk13` largeint(40)  null comment ""
+            `analyzetestlimitedk13` largeint(40)  null comment "",
+            `analyzetestlimitedk14` ARRAY<int(11)> NULL COMMENT "",
+            `analyzetestlimitedk15` Map<STRING, INT> NULL COMMENT "",
+            `analyzetestlimitedk16` STRUCT<s_id:int(11), s_name:string, 
s_address:string> NULL,
+            `analyzetestlimitedk17` JSON NULL
         ) engine=olap
         DUPLICATE KEY(`analyzetestlimitedk3`)
         DISTRIBUTED BY HASH(`analyzetestlimitedk3`) BUCKETS 5 
properties("replication_num" = "1")
@@ -67,26 +71,39 @@ suite("test_analyze") {
         INSERT INTO `${tbl}` VALUES 
(-2103297891,1,101,15248,4761818404925265645,939926.283,
         'UTmCFKMbprf0zSVOIlBJRNOl3JcNBdOsnCDt','2022-09-28','2022-10-28 
01:56:56','tVvGDSrN6kyn',
         
-954349107.187117,-40.46286,'g1ZP9nqVgaGKya3kPERdBofTWJQ4TIJEz972Xvw4hfPpTpWwlmondiLVTCyld7rSBlSWrE7NJRB0pvPGEFQKOx1s3',
-        '-1559301292834325905'),
+        '-1559301292834325905', NULL, NULL, NULL, NULL),
         
(-2094982029,0,-81,-14746,-2618177187906633064,121889.100,NULL,'2023-05-01','2022-11-25
 00:24:12',
         
'36jVI0phYfhFucAOEASbh4OdvUYcI7QZFgQSveNyfGcRRUtQG9HGN1UcCmUH',-82250254.174239,NULL,
-        
'bTUHnMC4v7dI8U3TK0z4wZHdytjfHQfF1xKdYAVwPVNMT4fT4F92hj8ENQXmCkWtfp','6971810221218612372'),
+        
'bTUHnMC4v7dI8U3TK0z4wZHdytjfHQfF1xKdYAVwPVNMT4fT4F92hj8ENQXmCkWtfp','6971810221218612372',
 NULL, NULL, NULL, NULL),
         
(-1840301109,1,NULL,NULL,7805768460922079440,546556.220,'wC7Pif9SJrg9b0wicGfPz2ezEmEKotmN6AMI',NULL,
         '2023-05-20 18:13:14','NM5SLu62SGeuD',-1555800813.9748349,-11122.953,
-        
'NH97wIjXk7dspvvfUUKe41ZetUnDmqLxGg8UYXwOwK3Jlu7dxO2GE9UJjyKW0NBxqUk1DY','-5004534044262380098'),
+        
'NH97wIjXk7dspvvfUUKe41ZetUnDmqLxGg8UYXwOwK3Jlu7dxO2GE9UJjyKW0NBxqUk1DY','-5004534044262380098',
 NULL, NULL, NULL, NULL),
         
(-1819679967,0,10,NULL,-5772413527188525359,-532045.626,'kqMe4VYEZAmajLthCLRkl8StDQHKrDWz91AQ','2022-06-30',
         '2023-02-22 
15:30:38','wAbeF3p84j5pFJTInQuKZOezFbsy8HIjmuUF',-1766437367.4377379,1791.4128,
-        
'6OWmBD04UeKt1xI2XnR8t1kPG7qEYrf4J8RkA8UMs4HF33Yl','-8433424551792664598'),
+        
'6OWmBD04UeKt1xI2XnR8t1kPG7qEYrf4J8RkA8UMs4HF33Yl','-8433424551792664598', 
NULL, NULL, NULL, NULL),
         
(-1490846276,0,NULL,7744,6074522476276146996,594200.976,NULL,'2022-11-27','2023-03-11
 21:28:44',
         
'yr8AuJLr2ud7DIwlt06cC7711UOsKslcDyySuqqfQE5X7Vjic6azHOrM6W',-715849856.288922,3762.217,
-        
'4UpWZJ0Twrefw0Tm0AxFS38V5','7406302706201801560'),(-1465848366,1,72,29170,-5585523608136628843,-34210.874,
+        '4UpWZJ0Twrefw0Tm0AxFS38V5','7406302706201801560', NULL, NULL, NULL, 
NULL),(-1465848366,1,72,29170,-5585523608136628843,-34210.874,
         'rMGygAWU91Wa3b5A7l1wheo6EF0o6zhw4YeE','2022-09-20','2023-06-11 
18:17:16','B6m9S9O2amsa4SXrEKK0ivJ2x9m1u8av',
-        
862085772.298349,-22304.209,'1','-3399178642401166400'),(-394034614,1,65,5393,-200651968801088119,NULL,
+        862085772.298349,-22304.209,'1','-3399178642401166400', NULL, NULL, 
NULL, NULL),(-394034614,1,65,5393,-200651968801088119,NULL,
         '9MapWX9pn8zes9Gey1lhRsH3ATyQPIysjQYi','2023-05-11','2022-07-02 
02:56:53','z5VWbuKr6HiK7yC7MRIoQGrb98VUS',
-        
1877828963.091433,-1204.1926,'fSDQqT38rkrJEi6fwc90rivgQcRPaW5V1aEmZpdSvUm','8882970420609470903'),
+        
1877828963.091433,-1204.1926,'fSDQqT38rkrJEi6fwc90rivgQcRPaW5V1aEmZpdSvUm','8882970420609470903',
 NULL, NULL, NULL, NULL),
         
(-287465855,0,-10,-32484,-5161845307234178602,748718.592,'n64TXbG25DQL5aw5oo9o9cowSjHCXry9HkId','2023-01-02',
         '2022-11-17 
14:58:52','d523m4PwLdHZtPTqSoOBo5IGivCKe4A1Sc8SKCILFxgzYLe0',NULL,27979.855,
-        
'ps7qwcZjBjkGfcXYMw5HQMwnElzoHqinwk8vhQCbVoGBgfotc4oSkpD3tP34h4h0tTogDMwFu60iJm1bofUzyUQofTeRwZk8','4692206687866847780')
+        
'ps7qwcZjBjkGfcXYMw5HQMwnElzoHqinwk8vhQCbVoGBgfotc4oSkpD3tP34h4h0tTogDMwFu60iJm1bofUzyUQofTeRwZk8','4692206687866847780',
 NULL, NULL, NULL, NULL)
+    """
+
+    sql """
+        SET enable_nereids_planner=true;
+        
+    """
+
+    sql """
+        SET forbid_unknown_col_stats=false;
+    """
+
+    sql """
+        SELECT * FROM ${tbl}
     """
 
     sql """
@@ -97,10 +114,6 @@ suite("test_analyze") {
         ANALYZE DATABASE ${db} WITH SYNC
     """
 
-    sql """
-        SET enable_nereids_planner=true;
-        
-        """
     sql """
         SET enable_fallback_to_original_planner=false;
         """
@@ -110,9 +123,9 @@ suite("test_analyze") {
 
     Thread.sleep(1000 * 60)
 
-    sql """
-        SELECT * FROM ${tbl}; 
-    """
+//    sql """
+//        SELECT * FROM ${tbl};
+//    """
 
     sql """
         DROP STATS ${tbl}(analyzetestlimitedk3)
@@ -120,51 +133,51 @@ suite("test_analyze") {
 
     def exception = null
 
-    try {
-        sql """
-            SELECT * FROM ${tbl}; 
-        """
-    } catch (Exception e) {
-        exception = e
-    }
-
-    assert exception != null
-
-    exception = null
+//    try {
+//        sql """
+//            SELECT * FROM ${tbl};
+//        """
+//    } catch (Exception e) {
+//        exception = e
+//    }
+//
+//    assert exception != null
+//
+//    exception = null
 
     sql """
         ANALYZE TABLE ${tbl} WITH SYNC
     """
 
-    sql """
-        SELECT * FROM ${tbl}; 
-    """
+//    sql """
+//        SELECT * FROM ${tbl};
+//    """
 
     sql """
         DROP STATS ${tbl}
     """
 
-    try {
-        sql """
-            SELECT * FROM ${tbl}; 
-        """
-    } catch (Exception e) {
-        exception = e
-    }
+//    try {
+//        sql """
+//            SELECT * FROM ${tbl};
+//        """
+//    } catch (Exception e) {
+//        exception = e
+//    }
 
-    a_result_1 = sql """
+    def a_result_1 = sql """
         ANALYZE DATABASE ${db} WITH SYNC WITH SAMPLE PERCENT 10
     """
 
-    a_result_2 = sql """
+    def a_result_2 = sql """
         ANALYZE DATABASE ${db} WITH SYNC WITH SAMPLE PERCENT 5
     """
 
-    a_result_3 = sql """
+    def a_result_3 = sql """
         ANALYZE DATABASE ${db} WITH SAMPLE PERCENT 5
     """
 
-    show_result = sql """
+    def show_result = sql """
         SHOW ANALYZE
     """
 
@@ -891,8 +904,24 @@ PARTITION `p599` VALUES IN (599)
     }
 
     assert expected_col_stats(id_col_stats, 600, 1)
-    assert expected_col_stats(id_col_stats, 599, 7)
+    assert (int) Double.parseDouble(id_col_stats[0][2]) < 700
+            && (int) Double.parseDouble(id_col_stats[0][2]) > 500
+    assert expected_col_stats(id_col_stats, 0, 3)
+    assert expected_col_stats(id_col_stats, 2400, 4)
+    assert expected_col_stats(id_col_stats, 4, 5)
     assert expected_col_stats(id_col_stats, 0, 6)
+    assert expected_col_stats(id_col_stats, 599, 7)
+
+    def update_time = id_col_stats[0][8]
+
+    sql """ANALYZE TABLE test_600_partition_table_analyze WITH SYNC"""
+
+    // Data has no change, update time shouldn't be update since this table 
don't need to analyze again
+    id_col_stats_2 = sql """
+        SHOW COLUMN CACHED STATS test_600_partition_table_analyze(id);
+    """
+
+    assert update_time == id_col_stats_2[0][8]
 
     sql """DROP TABLE IF EXISTS increment_analyze_test"""
     sql """
@@ -1151,4 +1180,39 @@ PARTITION `p599` VALUES IN (599)
         return (r[0][7]).equals(expected_value)
     }
     expected_max(max, "测试")
+
+    show_result = sql """
+        SHOW ANALYZE ${tbl}
+    """
+
+    def tbl_name_as_expetected = { r,name ->
+        for (int i = 0; i < r.size; i++) {
+            if (r[i][3] != name) {
+                return false
+            }
+        }
+        return true
+    }
+
+    assert show_result[0][9] == "FINISHED"
+    assert tbl_name_as_expetected(show_result, "${tbl}")
+
+    show_result = sql """
+        SHOW ANALYZE ${tbl} WHERE STATE = "FINISHED"
+    """
+
+    assert show_result.size() > 0
+
+    def all_finished = { r ->
+        for (int i = 0; i < r.size; i++) {
+            if (r[i][9] != "FINISHED") {
+                return  false
+            }
+        }
+        return true
+    }
+
+    assert all_finished(show_result)
+
+
 }
diff --git a/regression-test/suites/statistics/test_agg_complex_type.groovy 
b/regression-test/suites/statistics/test_agg_complex_type.groovy
new file mode 100644
index 00000000000..55af87f35bd
--- /dev/null
+++ b/regression-test/suites/statistics/test_agg_complex_type.groovy
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_analyze_with_agg_complex_type") {
+    sql """drop table if exists test_agg_complex_type;"""
+
+    sql """create table test_agg_complex_type (
+            datekey int,
+            device_id bitmap BITMAP_UNION NULL,
+                    hll_test hll hll_union,
+                    qs QUANTILE_STATE QUANTILE_UNION
+    )
+    aggregate key (datekey)
+    distributed by hash(datekey) buckets 1
+    properties(
+            "replication_num" = "1"
+    );"""
+
+    sql """insert into test_agg_complex_type values (1,to_bitmap(1), 
hll_hash("11"), TO_QUANTILE_STATE("11", 1.0));"""
+    
+    sql """insert into test_agg_complex_type values (2, to_bitmap(1),  
hll_hash("12"), TO_QUANTILE_STATE("11", 1.0));"""
+    
+    sql """ANALYZE TABLE test_agg_complex_type WITH SYNC"""
+
+    def show_result = sql """SHOW COLUMN CACHED STATS test_agg_complex_type"""
+
+    assert show_result.size() == 1
+
+    def expected_col_stats = { r, expected_value, idx ->
+        return (int) Double.parseDouble(r[0][idx]) == expected_value
+    }
+
+    assert expected_col_stats(show_result, 2, 1)
+    assert expected_col_stats(show_result, 0, 3)
+    assert expected_col_stats(show_result, 8, 4)
+    assert expected_col_stats(show_result, 4, 5)
+    assert expected_col_stats(show_result, 1, 6)
+    assert expected_col_stats(show_result, 2, 7)
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to