This is an automated email from the ASF dual-hosted git repository. lijibing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b009cc001ed [fix](statistics)Fix partition table health rate evaluation bug. (#35924) b009cc001ed is described below commit b009cc001edf065c524ad05658373abbbecce136 Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Wed Jun 5 22:31:22 2024 +0800 [fix](statistics)Fix partition table health rate evaluation bug. (#35924) The older Doris version doesn't support partition stats collection, so when user upgrade their cluster, the metadata doesn't contain partition level update rows for each table. In this case, auto collector thought this table haven't collect partition stats, which will trigger a new collection. Since the partition level update rows are only updated after load operation, if user stop loading, auto analyzing will keep collecting this table for ever. This pr is to fix this problem, set the partition update rows to 0 for the old version tables. --- .../java/org/apache/doris/statistics/AnalysisInfo.java | 4 ++-- .../org/apache/doris/statistics/AnalysisManager.java | 17 ++++++++++++----- .../org/apache/doris/statistics/BaseAnalysisTask.java | 15 ++++++++++++--- .../org/apache/doris/statistics/TableStatsMeta.java | 10 +++++++++- .../apache/doris/statistics/util/StatisticsUtil.java | 18 ++++++++---------- 5 files changed, 43 insertions(+), 21 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java index d55d4ccd940..c02455b6dca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java @@ -35,11 +35,11 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.text.ParseException; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.StringJoiner; +import java.util.concurrent.ConcurrentHashMap; public class AnalysisInfo implements Writable { @@ -191,7 +191,7 @@ public class AnalysisInfo implements Writable { @SerializedName("updateRows") public final long updateRows; - public final Map<Long, Long> partitionUpdateRows = new HashMap(); + public final Map<Long, Long> partitionUpdateRows = new ConcurrentHashMap<>(); @SerializedName("tblUpdateTime") public final long tblUpdateTime; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 4b9c7129be6..7e1123e422d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -391,6 +391,7 @@ public class AnalysisManager implements Writable { if (jobInfo.scheduleType == ScheduleType.PERIOD && jobInfo.lastExecTimeInMs > 0) { return; } + // TODO: why create a new info object? AnalysisInfoBuilder jobInfoBuilder = new AnalysisInfoBuilder(jobInfo); AnalysisInfo analysisInfo = jobInfoBuilder.setTaskId(-1).build(); replayCreateAnalysisJob(analysisInfo); @@ -1119,6 +1120,9 @@ public class AnalysisManager implements Writable { OlapTable olapTable = (OlapTable) table; short replicaNum = olapTable.getTableProperty().getReplicaAllocation().getTotalReplicaNum(); Map<Long, Long> tabletRows = record.getValue(); + if (tabletRows == null) { + continue; + } long tableUpdateRows = 0; for (Entry<Long, Long> entry : tabletRows.entrySet()) { tableUpdateRows += entry.getValue() / replicaNum; @@ -1151,13 +1155,16 @@ public class AnalysisManager implements Writable { } protected void updatePartitionRows(OlapTable table, Map<Long, Long> originTabletToRows, - TableStatsMeta statsStatus, short replicaNum) { + TableStatsMeta tableStats, short replicaNum) { + if (!table.isPartitionedTable()) { + return; + } List<Partition> partitions = table.getPartitions().stream().sorted( Comparator.comparing(Partition::getVisibleVersionTime).reversed()).collect(Collectors.toList()); Map<Long, Long> tabletToRows = new HashMap<>(originTabletToRows); int tabletCount = tabletToRows.size(); - if (statsStatus.partitionUpdateRows == null) { - statsStatus.partitionUpdateRows = new ConcurrentHashMap<>(); + if (tableStats.partitionUpdateRows == null) { + tableStats.partitionUpdateRows = new ConcurrentHashMap<>(); } for (Partition p : partitions) { MaterializedIndex baseIndex = p.getBaseIndex(); @@ -1170,9 +1177,9 @@ public class AnalysisManager implements Writable { continue; } long tabletRows = entry.getValue(); - statsStatus.partitionUpdateRows.computeIfPresent(p.getId(), + tableStats.partitionUpdateRows.computeIfPresent(p.getId(), (id, rows) -> rows += tabletRows / replicaNum); - statsStatus.partitionUpdateRows.putIfAbsent(p.getId(), tabletRows / replicaNum); + tableStats.partitionUpdateRows.putIfAbsent(p.getId(), tabletRows / replicaNum); iterator.remove(); tabletCount--; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java index 466d1ff6947..020cd6d9d81 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java @@ -360,7 +360,8 @@ public abstract class BaseAnalysisTask { Set<String> partitionNames = tbl.getPartitionNames(); List<String> sqls = Lists.newArrayList(); int count = 0; - TableStatsMeta tableStatsStatus = Env.getServingEnv().getAnalysisManager().findTableStatsStatus(tbl.getId()); + AnalysisManager analysisManager = Env.getServingEnv().getAnalysisManager(); + TableStatsMeta tableStatsStatus = analysisManager.findTableStatsStatus(tbl.getId()); for (String part : partitionNames) { Partition partition = tbl.getPartition(part); params.put("partId", partition == null ? "-1" : String.valueOf(partition.getId())); @@ -372,9 +373,17 @@ public abstract class BaseAnalysisTask { ColStatsMeta columnStatsMeta = tableStatsStatus.findColumnStatsMeta(idxName, col.getName()); if (columnStatsMeta != null && columnStatsMeta.partitionUpdateRows != null) { ConcurrentMap<Long, Long> columnUpdateRows = columnStatsMeta.partitionUpdateRows; + + // findJobInfo will return null when doing sync analyzing. + AnalysisInfo jobInfo = analysisManager.findJobInfo(job.getJobInfo().jobId); + jobInfo = jobInfo == null ? job.jobInfo : jobInfo; + // For cluster upgrade compatible (older version metadata doesn't have partition update rows map) + // and insert before first analyze, set partition update rows to 0. + jobInfo.partitionUpdateRows.putIfAbsent(partition.getId(), 0L); + long id = partition.getId(); - if (Objects.equals(tableUpdateRows.get(id), columnUpdateRows.get(id))) { - LOG.info("Partition {} doesn't change after last analyze for column {}, skip it.", + if (Objects.equals(tableUpdateRows.getOrDefault(id, 0L), columnUpdateRows.get(id))) { + LOG.debug("Partition {} doesn't change after last analyze for column {}, skip it.", part, col.getName()); continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java index 7c2a01c7e11..ba425352f6f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java @@ -24,6 +24,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; import org.apache.doris.statistics.AnalysisInfo.JobType; @@ -42,7 +43,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -public class TableStatsMeta implements Writable { +public class TableStatsMeta implements Writable, GsonPostProcessable { @SerializedName("tblId") public final long tblId; @@ -179,4 +180,11 @@ public class TableStatsMeta implements Writable { public void convertDeprecatedColStatsToNewVersion() { deprecatedColNameToColStatsMeta = null; } + + @Override + public void gsonPostProcess() throws IOException { + if (partitionUpdateRows == null) { + partitionUpdateRows = new ConcurrentHashMap<>(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 1c6457e9521..190b8886488 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -984,7 +984,7 @@ public class StatisticsUtil { } // Partition table partition stats never been collected. if (StatisticsUtil.enablePartitionAnalyze() && table.isPartitionedTable() - && (columnStatsMeta.partitionUpdateRows == null || columnStatsMeta.partitionUpdateRows.isEmpty())) { + && columnStatsMeta.partitionUpdateRows == null) { return true; } if (table instanceof OlapTable) { @@ -1061,16 +1061,14 @@ public class StatisticsUtil { if (!partitionUpdateRows.containsKey(id)) { return true; } - long currentUpdateRows = tableStatsStatus.partitionUpdateRows.get(id); + long currentUpdateRows = tableStatsStatus.partitionUpdateRows.getOrDefault(id, 0L); long lastUpdateRows = partitionUpdateRows.get(id); - if (currentUpdateRows != 0) { - long changedRows = currentUpdateRows - lastUpdateRows; - if (changedRows > 0) { - changedPartitions++; - // Too much partition changed, need to reanalyze. - if (changedPartitions > UPDATED_PARTITION_THRESHOLD) { - return true; - } + long changedRows = currentUpdateRows - lastUpdateRows; + if (changedRows > 0) { + changedPartitions++; + // Too much partition changed, need to reanalyze. + if (changedPartitions > UPDATED_PARTITION_THRESHOLD) { + return true; } double changeRate = ((double) changedRows) / currentUpdateRows; // One partition changed too much, need to reanalyze. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org