This is an automated email from the ASF dual-hosted git repository. lijibing pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 3b9a2409b49 [improvement](statistics)External table getRowCount return -1 when row count is not available or row count is 0. (#43009) (#43087) 3b9a2409b49 is described below commit 3b9a2409b49b5d2c9cb8b6242151c643e4fe5bf1 Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Sat Nov 2 08:38:55 2024 +0800 [improvement](statistics)External table getRowCount return -1 when row count is not available or row count is 0. (#43009) (#43087) backport: https://github.com/apache/doris/pull/43009 --- .../java/org/apache/doris/catalog/OlapTable.java | 8 +-- .../main/java/org/apache/doris/catalog/Table.java | 2 +- .../java/org/apache/doris/catalog/TableIf.java | 2 + .../doris/catalog/external/ExternalTable.java | 4 +- .../doris/catalog/external/HMSExternalTable.java | 7 +- .../catalog/external/PaimonExternalTable.java | 4 +- .../doris/datasource/ExternalRowCountCache.java | 6 +- .../doris/external/iceberg/util/IcebergUtils.java | 6 +- .../doris/statistics/StatisticsAutoCollector.java | 2 +- .../doris/statistics/util/StatisticsUtil.java | 22 +++--- .../datasource/ExternalRowCountCacheTest.java | 83 +++++++++++++++++++++- 11 files changed, 116 insertions(+), 30 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 3d14b92de07..9b95b1b20ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -119,8 +119,6 @@ public class OlapTable extends Table { WAITING_STABLE } - public static long ROW_COUNT_BEFORE_REPORT = -1; - private volatile OlapTableState state; // index id -> index meta @@ -1298,12 +1296,12 @@ public class OlapTable extends Table { if (index == null) { LOG.warn("Index {} not exist in partition {}, table {}, {}", indexId, entry.getValue().getName(), id, name); - return ROW_COUNT_BEFORE_REPORT; + return UNKNOWN_ROW_COUNT; } if (strict && !index.getRowCountReported()) { - return ROW_COUNT_BEFORE_REPORT; + return UNKNOWN_ROW_COUNT; } - rowCount += index.getRowCount() == -1 ? 0 : index.getRowCount(); + rowCount += index.getRowCount() == UNKNOWN_ROW_COUNT ? 0 : index.getRowCount(); } return rowCount; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index 7cbef2a5d6c..17df068ff21 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -614,6 +614,6 @@ public abstract class Table extends MetaObject implements Writable, TableIf { @Override public long fetchRowCount() { - return 0; + return UNKNOWN_ROW_COUNT; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index c1bcf5b2179..4586ed62900 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -45,6 +45,8 @@ import java.util.concurrent.TimeUnit; public interface TableIf { Logger LOG = LogManager.getLogger(TableIf.class); + long UNKNOWN_ROW_COUNT = -1; + void readLock(); boolean tryReadLock(long timeout, TimeUnit unit); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java index 90188e18fed..0a8ffce08ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java @@ -290,7 +290,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { makeSureInitialized(); } catch (Exception e) { LOG.warn("Failed to initialize table {}.{}.{}", catalog.getName(), dbName, name, e); - return 0; + return TableIf.UNKNOWN_ROW_COUNT; } // All external table should get external row count from cache. return Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache().getCachedRowCount(catalog.getId(), dbId, id); @@ -302,7 +302,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { * This is called by ExternalRowCountCache to load row count cache. */ public long fetchRowCount() { - return 0; + return UNKNOWN_ROW_COUNT; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index 4632433ee49..7910ed5af45 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.HiveMetaStoreClientHelper; import org.apache.doris.catalog.HudiUtils; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.hive.HiveMetaStoreCache; @@ -308,9 +309,9 @@ public class HMSExternalTable extends ExternalTable { break; default: LOG.warn("getRowCount for dlaType {} is not supported.", dlaType); - rowCount = -1; + rowCount = TableIf.UNKNOWN_ROW_COUNT; } - return rowCount; + return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT; } @Override @@ -477,7 +478,7 @@ public class HMSExternalTable extends ExternalTable { // Get row count from hive metastore property. long rowCount = getRowCountFromExternalSource(); // Only hive table supports estimate row count by listing file. - if (rowCount == -1 && dlaType.equals(DLAType.HIVE)) { + if (rowCount == UNKNOWN_ROW_COUNT && dlaType.equals(DLAType.HIVE)) { LOG.debug("Will estimate row count from file list."); rowCount = StatisticsUtil.getRowCountFromFileList(this); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java index a28e7178dfb..b2bb165c471 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java @@ -175,10 +175,10 @@ public class PaimonExternalTable extends ExternalTable { for (Split split : splits) { rowCount += split.rowCount(); } - return rowCount; + return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT; } catch (Exception e) { LOG.warn("Fail to collect row count for db {} table {}", dbName, name, e); } - return -1; + return UNKNOWN_ROW_COUNT; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java index 632cde1d5a7..68fed7c2f36 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java @@ -94,7 +94,7 @@ public class ExternalRowCountCache { } /** - * Get cached row count for the given table. Return 0 if cached not loaded or table not exists. + * Get cached row count for the given table. Return -1 if cached not loaded or table not exists. * Cached will be loaded async. * @param catalogId * @param dbId @@ -106,12 +106,12 @@ public class ExternalRowCountCache { try { CompletableFuture<Optional<Long>> f = rowCountCache.get(key); if (f.isDone()) { - return f.get().orElse(0L); + return f.get().orElse(TableIf.UNKNOWN_ROW_COUNT); } } catch (Exception e) { LOG.warn("Unexpected exception while returning row count", e); } - return 0; + return TableIf.UNKNOWN_ROW_COUNT; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java index 3f4779ef893..90695b94cc4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java @@ -36,6 +36,7 @@ import org.apache.doris.analysis.StringLiteral; import org.apache.doris.analysis.Subquery; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -616,15 +617,16 @@ public class IcebergUtils { .getIcebergTable(catalog, dbName, tbName); Snapshot snapshot = icebergTable.currentSnapshot(); if (snapshot == null) { + LOG.info("Iceberg table {}.{}.{} is empty, return -1.", catalog.getName(), dbName, tbName); // empty table - return 0; + return TableIf.UNKNOWN_ROW_COUNT; } Map<String, String> summary = snapshot.summary(); return Long.parseLong(summary.get(TOTAL_RECORDS)) - Long.parseLong(summary.get(TOTAL_POSITION_DELETES)); } catch (Exception e) { LOG.warn("Fail to collect row count for db {} table {}", dbName, tbName, e); } - return -1; + return TableIf.UNKNOWN_ROW_COUNT; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index 137068d4315..b4d8f1ad0ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -181,7 +181,7 @@ public class StatisticsAutoCollector extends StatisticsCollector { ? AnalysisMethod.SAMPLE : AnalysisMethod.FULL; if (table instanceof OlapTable && analysisMethod.equals(AnalysisMethod.SAMPLE)) { OlapTable ot = (OlapTable) table; - if (ot.getRowCountForIndex(ot.getBaseIndexId(), true) == OlapTable.ROW_COUNT_BEFORE_REPORT) { + if (ot.getRowCountForIndex(ot.getBaseIndexId(), true) == TableIf.UNKNOWN_ROW_COUNT) { LOG.info("Table {} row count is not fully reported, skip auto analyzing this time.", ot.getName()); return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index fe32755be45..b959b9bd506 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -562,18 +562,19 @@ public class StatisticsUtil { public static long getHiveRowCount(HMSExternalTable table) { Map<String, String> parameters = table.getRemoteTable().getParameters(); if (parameters == null) { - return -1; + return TableIf.UNKNOWN_ROW_COUNT; } // Table parameters contains row count, simply get and return it. if (parameters.containsKey(NUM_ROWS)) { long rows = Long.parseLong(parameters.get(NUM_ROWS)); // Sometimes, the NUM_ROWS in hms is 0 but actually is not. Need to check TOTAL_SIZE if NUM_ROWS is 0. - if (rows != 0) { + if (rows > 0) { + LOG.info("Get row count {} for hive table {} in table parameters.", rows, table.getName()); return rows; } } if (!parameters.containsKey(TOTAL_SIZE)) { - return -1; + return TableIf.UNKNOWN_ROW_COUNT; } // Table parameters doesn't contain row count but contain total size. Estimate row count : totalSize/rowSize long totalSize = Long.parseLong(parameters.get(TOTAL_SIZE)); @@ -582,9 +583,13 @@ public class StatisticsUtil { estimatedRowSize += column.getDataType().getSlotSize(); } if (estimatedRowSize == 0) { - return -1; + LOG.warn("Hive table {} estimated row size is invalid {}", table.getName(), estimatedRowSize); + return TableIf.UNKNOWN_ROW_COUNT; } - return totalSize / estimatedRowSize; + long rows = totalSize / estimatedRowSize; + LOG.debug("Get row count {} for hive table {} by total size {} and row size {}", + rows, table.getName(), totalSize, estimatedRowSize); + return rows; } /** @@ -608,7 +613,7 @@ public class StatisticsUtil { */ public static long getRowCountFromFileList(HMSExternalTable table) { if (table.isView()) { - return 0; + return TableIf.UNKNOWN_ROW_COUNT; } HiveMetaStoreCache.HivePartitionValues partitionValues = getPartitionValuesForTable(table); int totalPartitionSize = partitionValues == null ? 1 : partitionValues.getIdToPartitionItem().size(); @@ -635,12 +640,13 @@ public class StatisticsUtil { estimatedRowSize += column.getDataType().getSlotSize(); } if (estimatedRowSize == 0) { - return 0; + return TableIf.UNKNOWN_ROW_COUNT; } if (samplePartitionSize < totalPartitionSize) { totalSize = totalSize * totalPartitionSize / samplePartitionSize; } - return totalSize / estimatedRowSize; + long rows = totalSize / estimatedRowSize; + return rows > 0 ? rows : TableIf.UNKNOWN_ROW_COUNT; } public static HiveMetaStoreCache.HivePartitionValues getPartitionValuesForTable(HMSExternalTable table) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java index e8622f6b59a..c9effa8cf81 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java @@ -17,9 +17,14 @@ package org.apache.doris.datasource; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.ThreadPoolManager; +import org.apache.doris.datasource.ExternalRowCountCache.RowCountCacheLoader; import org.apache.doris.datasource.ExternalRowCountCache.RowCountKey; import org.apache.doris.statistics.BasicAsyncCacheLoader; +import mockit.Mock; +import mockit.MockUp; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -27,6 +32,8 @@ import org.junit.jupiter.api.Test; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; public class ExternalRowCountCacheTest { @@ -66,14 +73,14 @@ public class ExternalRowCountCacheTest { public void test() throws Exception { // table 1 long rowCount = cache.getCachedRowCount(1, 1, 1); - Assertions.assertEquals(0, rowCount); + Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, rowCount); Thread.sleep(1000); rowCount = cache.getCachedRowCount(1, 1, 1); Assertions.assertEquals(111, rowCount); // table 2 rowCount = cache.getCachedRowCount(1, 1, 2); - Assertions.assertEquals(0, rowCount); + Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, rowCount); Thread.sleep(1000); rowCount = cache.getCachedRowCount(1, 1, 2); Assertions.assertEquals(222, rowCount); @@ -81,7 +88,7 @@ public class ExternalRowCountCacheTest { // table 3 rowCount = cache.getCachedRowCount(1, 1, 3); // first get, it should be 0 because the loader is async - Assertions.assertEquals(0, rowCount); + Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, rowCount); // After sleep 2 sec and then get, it should be 1 Thread.sleep(2000); rowCount = cache.getCachedRowCount(1, 1, 3); @@ -97,4 +104,74 @@ public class ExternalRowCountCacheTest { // refresh done, value should be 2 Assertions.assertEquals(335, rowCount); } + + @Test + public void testLoadWithException() throws Exception { + ThreadPoolExecutor executor = ThreadPoolManager.newDaemonFixedThreadPool( + 1, Integer.MAX_VALUE, "TEST", true); + AtomicInteger counter = new AtomicInteger(0); + + new MockUp<RowCountCacheLoader>() { + @Mock + protected Optional<Long> doLoad(ExternalRowCountCache.RowCountKey rowCountKey) { + counter.incrementAndGet(); + return null; + } + }; + ExternalRowCountCache cache = new ExternalRowCountCache(executor, 2, null); + long cachedRowCount = cache.getCachedRowCount(1, 1, 1); + Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount); + for (int i = 0; i < 60; i++) { + if (counter.get() == 1) { + break; + } + Thread.sleep(1000); + } + Assertions.assertEquals(1, counter.get()); + + new MockUp<ExternalRowCountCache.RowCountCacheLoader>() { + @Mock + protected Optional<Long> doLoad(ExternalRowCountCache.RowCountKey rowCountKey) { + counter.incrementAndGet(); + return Optional.of(100L); + } + }; + cache.getCachedRowCount(1, 1, 1); + for (int i = 0; i < 60; i++) { + cachedRowCount = cache.getCachedRowCount(1, 1, 1); + if (cachedRowCount != TableIf.UNKNOWN_ROW_COUNT) { + Assertions.assertEquals(100, cachedRowCount); + break; + } + Thread.sleep(1000); + } + cachedRowCount = cache.getCachedRowCount(1, 1, 1); + Assertions.assertEquals(100, cachedRowCount); + Assertions.assertEquals(2, counter.get()); + + new MockUp<ExternalRowCountCache.RowCountCacheLoader>() { + @Mock + protected Optional<Long> doLoad(ExternalRowCountCache.RowCountKey rowCountKey) { + counter.incrementAndGet(); + try { + Thread.sleep(1000000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return Optional.of(100L); + } + }; + cachedRowCount = cache.getCachedRowCount(2, 2, 2); + Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount); + Thread.sleep(1000); + cachedRowCount = cache.getCachedRowCount(2, 2, 2); + Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount); + for (int i = 0; i < 60; i++) { + if (counter.get() == 3) { + break; + } + Thread.sleep(1000); + } + Assertions.assertEquals(3, counter.get()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org