This is an automated email from the ASF dual-hosted git repository.

lijibing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b009cc001ed [fix](statistics)Fix partition table health rate 
evaluation bug. (#35924)
b009cc001ed is described below

commit b009cc001edf065c524ad05658373abbbecce136
Author: Jibing-Li <64681310+jibing...@users.noreply.github.com>
AuthorDate: Wed Jun 5 22:31:22 2024 +0800

    [fix](statistics)Fix partition table health rate evaluation bug. (#35924)
    
    The older Doris version doesn't support partition stats collection, so
    when user upgrade their cluster, the metadata doesn't contain partition
    level update rows for each table. In this case, auto collector thought
    this table haven't collect partition stats, which will trigger a new
    collection. Since the partition level update rows are only updated after
    load operation, if user stop loading, auto analyzing will keep
    collecting this table for ever.
    This pr is to fix this problem, set the partition update rows to 0 for
    the old version tables.
---
 .../java/org/apache/doris/statistics/AnalysisInfo.java |  4 ++--
 .../org/apache/doris/statistics/AnalysisManager.java   | 17 ++++++++++++-----
 .../org/apache/doris/statistics/BaseAnalysisTask.java  | 15 ++++++++++++---
 .../org/apache/doris/statistics/TableStatsMeta.java    | 10 +++++++++-
 .../apache/doris/statistics/util/StatisticsUtil.java   | 18 ++++++++----------
 5 files changed, 43 insertions(+), 21 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
index d55d4ccd940..c02455b6dca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
@@ -35,11 +35,11 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.text.ParseException;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.StringJoiner;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class AnalysisInfo implements Writable {
 
@@ -191,7 +191,7 @@ public class AnalysisInfo implements Writable {
     @SerializedName("updateRows")
     public final long updateRows;
 
-    public final Map<Long, Long> partitionUpdateRows = new HashMap();
+    public final Map<Long, Long> partitionUpdateRows = new 
ConcurrentHashMap<>();
 
     @SerializedName("tblUpdateTime")
     public final long tblUpdateTime;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 4b9c7129be6..7e1123e422d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -391,6 +391,7 @@ public class AnalysisManager implements Writable {
         if (jobInfo.scheduleType == ScheduleType.PERIOD && 
jobInfo.lastExecTimeInMs > 0) {
             return;
         }
+        // TODO: why create a new info object?
         AnalysisInfoBuilder jobInfoBuilder = new AnalysisInfoBuilder(jobInfo);
         AnalysisInfo analysisInfo = jobInfoBuilder.setTaskId(-1).build();
         replayCreateAnalysisJob(analysisInfo);
@@ -1119,6 +1120,9 @@ public class AnalysisManager implements Writable {
                     OlapTable olapTable = (OlapTable) table;
                     short replicaNum = 
olapTable.getTableProperty().getReplicaAllocation().getTotalReplicaNum();
                     Map<Long, Long> tabletRows = record.getValue();
+                    if (tabletRows == null) {
+                        continue;
+                    }
                     long tableUpdateRows = 0;
                     for (Entry<Long, Long> entry : tabletRows.entrySet()) {
                         tableUpdateRows += entry.getValue() / replicaNum;
@@ -1151,13 +1155,16 @@ public class AnalysisManager implements Writable {
     }
 
     protected void updatePartitionRows(OlapTable table, Map<Long, Long> 
originTabletToRows,
-                                       TableStatsMeta statsStatus, short 
replicaNum) {
+                                       TableStatsMeta tableStats, short 
replicaNum) {
+        if (!table.isPartitionedTable()) {
+            return;
+        }
         List<Partition> partitions = table.getPartitions().stream().sorted(
                 
Comparator.comparing(Partition::getVisibleVersionTime).reversed()).collect(Collectors.toList());
         Map<Long, Long> tabletToRows = new HashMap<>(originTabletToRows);
         int tabletCount = tabletToRows.size();
-        if (statsStatus.partitionUpdateRows == null) {
-            statsStatus.partitionUpdateRows = new ConcurrentHashMap<>();
+        if (tableStats.partitionUpdateRows == null) {
+            tableStats.partitionUpdateRows = new ConcurrentHashMap<>();
         }
         for (Partition p : partitions) {
             MaterializedIndex baseIndex = p.getBaseIndex();
@@ -1170,9 +1177,9 @@ public class AnalysisManager implements Writable {
                     continue;
                 }
                 long tabletRows = entry.getValue();
-                statsStatus.partitionUpdateRows.computeIfPresent(p.getId(),
+                tableStats.partitionUpdateRows.computeIfPresent(p.getId(),
                         (id, rows) -> rows += tabletRows / replicaNum);
-                statsStatus.partitionUpdateRows.putIfAbsent(p.getId(), 
tabletRows / replicaNum);
+                tableStats.partitionUpdateRows.putIfAbsent(p.getId(), 
tabletRows / replicaNum);
                 iterator.remove();
                 tabletCount--;
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
index 466d1ff6947..020cd6d9d81 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
@@ -360,7 +360,8 @@ public abstract class BaseAnalysisTask {
         Set<String> partitionNames = tbl.getPartitionNames();
         List<String> sqls = Lists.newArrayList();
         int count = 0;
-        TableStatsMeta tableStatsStatus = 
Env.getServingEnv().getAnalysisManager().findTableStatsStatus(tbl.getId());
+        AnalysisManager analysisManager = 
Env.getServingEnv().getAnalysisManager();
+        TableStatsMeta tableStatsStatus = 
analysisManager.findTableStatsStatus(tbl.getId());
         for (String part : partitionNames) {
             Partition partition = tbl.getPartition(part);
             params.put("partId", partition == null ? "-1" : 
String.valueOf(partition.getId()));
@@ -372,9 +373,17 @@ public abstract class BaseAnalysisTask {
                 ColStatsMeta columnStatsMeta = 
tableStatsStatus.findColumnStatsMeta(idxName, col.getName());
                 if (columnStatsMeta != null && 
columnStatsMeta.partitionUpdateRows != null) {
                     ConcurrentMap<Long, Long> columnUpdateRows = 
columnStatsMeta.partitionUpdateRows;
+
+                    // findJobInfo will return null when doing sync analyzing.
+                    AnalysisInfo jobInfo = 
analysisManager.findJobInfo(job.getJobInfo().jobId);
+                    jobInfo = jobInfo == null ? job.jobInfo : jobInfo;
+                    // For cluster upgrade compatible (older version metadata 
doesn't have partition update rows map)
+                    // and insert before first analyze, set partition update 
rows to 0.
+                    jobInfo.partitionUpdateRows.putIfAbsent(partition.getId(), 
0L);
+
                     long id = partition.getId();
-                    if (Objects.equals(tableUpdateRows.get(id), 
columnUpdateRows.get(id))) {
-                        LOG.info("Partition {} doesn't change after last 
analyze for column {}, skip it.",
+                    if (Objects.equals(tableUpdateRows.getOrDefault(id, 0L), 
columnUpdateRows.get(id))) {
+                        LOG.debug("Partition {} doesn't change after last 
analyze for column {}, skip it.",
                                 part, col.getName());
                         continue;
                     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
index 7c2a01c7e11..ba425352f6f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
@@ -24,6 +24,7 @@ import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
 import org.apache.doris.statistics.AnalysisInfo.JobType;
@@ -42,7 +43,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
-public class TableStatsMeta implements Writable {
+public class TableStatsMeta implements Writable, GsonPostProcessable {
 
     @SerializedName("tblId")
     public final long tblId;
@@ -179,4 +180,11 @@ public class TableStatsMeta implements Writable {
     public void convertDeprecatedColStatsToNewVersion() {
         deprecatedColNameToColStatsMeta = null;
     }
+
+    @Override
+    public void gsonPostProcess() throws IOException {
+        if (partitionUpdateRows == null) {
+            partitionUpdateRows = new ConcurrentHashMap<>();
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 1c6457e9521..190b8886488 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -984,7 +984,7 @@ public class StatisticsUtil {
         }
         // Partition table partition stats never been collected.
         if (StatisticsUtil.enablePartitionAnalyze() && 
table.isPartitionedTable()
-                && (columnStatsMeta.partitionUpdateRows == null || 
columnStatsMeta.partitionUpdateRows.isEmpty())) {
+                && columnStatsMeta.partitionUpdateRows == null) {
             return true;
         }
         if (table instanceof OlapTable) {
@@ -1061,16 +1061,14 @@ public class StatisticsUtil {
             if (!partitionUpdateRows.containsKey(id)) {
                 return true;
             }
-            long currentUpdateRows = 
tableStatsStatus.partitionUpdateRows.get(id);
+            long currentUpdateRows = 
tableStatsStatus.partitionUpdateRows.getOrDefault(id, 0L);
             long lastUpdateRows = partitionUpdateRows.get(id);
-            if (currentUpdateRows != 0) {
-                long changedRows = currentUpdateRows - lastUpdateRows;
-                if (changedRows > 0) {
-                    changedPartitions++;
-                    // Too much partition changed, need to reanalyze.
-                    if (changedPartitions > UPDATED_PARTITION_THRESHOLD) {
-                        return true;
-                    }
+            long changedRows = currentUpdateRows - lastUpdateRows;
+            if (changedRows > 0) {
+                changedPartitions++;
+                // Too much partition changed, need to reanalyze.
+                if (changedPartitions > UPDATED_PARTITION_THRESHOLD) {
+                    return true;
                 }
                 double changeRate = ((double) changedRows) / currentUpdateRows;
                 // One partition changed too much, need to reanalyze.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to