This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch FixFlushQueryDeadLock in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit cfdf53066028fe5772f478cd355b8f00f7327220 Author: Jackie Tien <[email protected]> AuthorDate: Fri Aug 22 17:11:11 2025 +0800 Change Query Data Source init way from lock to tryLock & Add more log and remove useless synchronize (cherry picked from commit 1b377c049d80a5e83520746c0952a83f5f03bac0) --- .../queryengine/execution/driver/DataDriver.java | 44 ++- .../fragment/FakedFragmentInstanceContext.java | 5 +- .../fragment/FragmentInstanceContext.java | 245 ++++++++---- .../fragment/FragmentInstanceExecution.java | 8 +- .../queryengine/metric/QueryResourceMetricSet.java | 26 +- .../FragmentInstanceStatisticsDrawer.java | 11 + .../db/storageengine/dataregion/DataRegion.java | 411 +++++++++++++++------ .../dataregion/IDataRegionForQuery.java | 29 +- .../dataregion/VirtualDataRegion.java | 36 +- .../memtable/AbstractWritableMemChunk.java | 11 + .../dataregion/memtable/TsFileProcessor.java | 63 +++- .../dataregion/tsfile/TsFileManager.java | 41 ++ .../db/queryengine/execution/DataDriverTest.java | 11 +- .../src/main/thrift/datanode.thrift | 1 + 14 files changed, 693 insertions(+), 249 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java index a225793437f..743a05d3777 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java @@ -32,6 +32,7 @@ import javax.annotation.concurrent.NotThreadSafe; import java.util.List; import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.QUERY_RESOURCE_INIT; +import static org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion.UNFINISHED_QUERY_DATA_SOURCE; /** * One {@link DataDriver} is responsible for one {@link FragmentInstance} which is for data query, @@ -40,7 +41,7 @@ import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.QUE @NotThreadSafe public class DataDriver extends Driver { - private boolean init; + private boolean init = false; // Unit : Byte private final long estimatedMemorySize; @@ -55,7 +56,13 @@ public class DataDriver extends Driver { protected boolean init(SettableFuture<?> blockedFuture) { if (!init) { try { - initialize(); + if (!initialize()) { // failed to init this time, but now exception thrown, possibly failed + // to acquire lock within the specific time + blockedFuture.set(null); + return false; + } else { + return true; + } } catch (Throwable t) { LOGGER.error( "Failed to do the initialization for driver {} ", driverContext.getDriverTaskID(), t); @@ -78,8 +85,7 @@ public class DataDriver extends Driver { * org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource} is null after * initialization, IllegalStateException will be thrown */ - private void initialize() throws QueryProcessException { - long startTime = System.nanoTime(); + private boolean initialize() throws QueryProcessException { try { List<DataSourceOperator> sourceOperators = ((DataDriverContext) driverContext).getSourceOperators(); @@ -90,20 +96,30 @@ public class DataDriver extends Driver { // for some reasons, we may get null QueryDataSource here. // And it's safe for us to throw this exception here in such case. throw new IllegalStateException("QueryDataSource should never be null!"); + } else if (dataSource == UNFINISHED_QUERY_DATA_SOURCE) { + // init query data source timeout. Maybe failed to acquire the read lock within the + // specified time + // do nothing, wait for next try + } else { + sourceOperators.forEach( + sourceOperator -> { + // Construct QueryDataSource for source operator + sourceOperator.initQueryDataSource(dataSource.clone()); + }); + this.init = true; } - sourceOperators.forEach( - sourceOperator -> { - // Construct QueryDataSource for source operator - sourceOperator.initQueryDataSource(dataSource.clone()); - }); + } else { + this.init = true; } - - this.init = true; } finally { - ((DataDriverContext) driverContext).clearSourceOperators(); - QUERY_EXECUTION_METRICS.recordExecutionCost( - QUERY_RESOURCE_INIT, System.nanoTime() - startTime); + if (this.init) { + ((DataDriverContext) driverContext).clearSourceOperators(); + QUERY_EXECUTION_METRICS.recordExecutionCost( + QUERY_RESOURCE_INIT, + driverContext.getFragmentInstanceContext().getInitQueryDataSourceCost()); + } } + return this.init; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FakedFragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FakedFragmentInstanceContext.java index a91e41d2379..a76ca60fd5c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FakedFragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FakedFragmentInstanceContext.java @@ -48,7 +48,7 @@ public class FakedFragmentInstanceContext extends FragmentInstanceContext { public void initQueryDataSource(PartialPath sourcePath) throws QueryProcessException { - dataRegion.readLock(); + dataRegion.tryReadLock(Long.MAX_VALUE); try { this.sharedQueryDataSource = dataRegion.query( @@ -56,7 +56,8 @@ public class FakedFragmentInstanceContext extends FragmentInstanceContext { sourcePath.getDevice(), this, getGlobalTimeFilter(), - null); + null, + Long.MAX_VALUE); // used files should be added before mergeLock is unlocked, or they may be deleted by // running merge diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 5510394396d..cdb44db5f66 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -34,6 +34,7 @@ import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet; import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet; +import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet; import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet; import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager; import org.apache.iotdb.db.queryengine.plan.planner.memory.ThreadSafeMemoryReservationManager; @@ -73,11 +74,12 @@ import java.util.stream.Collectors; import static org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.BLOCK_QUEUED_TIME; import static org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.READY_QUEUED_TIME; import static org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion.EMPTY_QUERY_DATA_SOURCE; +import static org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion.UNFINISHED_QUERY_DATA_SOURCE; public class FragmentInstanceContext extends QueryContext { private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstanceContext.class); - private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); private static final long END_TIME_INITIAL_VALUE = -1L; // wait over 5s for driver to close is abnormal private static final long LONG_WAIT_DURATION = 5_000_000_000L; @@ -139,6 +141,7 @@ public class FragmentInstanceContext extends QueryContext { private TFetchFragmentInstanceStatisticsResp fragmentInstanceStatistics = null; private long initQueryDataSourceCost = 0; + private int initQueryDataSourceRetryCount = 0; private final AtomicLong readyQueueTime = new AtomicLong(0); private final AtomicLong blockQueueTime = new AtomicLong(0); private long unclosedSeqFileNum = 0; @@ -353,7 +356,7 @@ public class FragmentInstanceContext extends QueryContext { PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> allMods = loadAllModificationsFromDisk(resource); atomicReference.set(allMods); - if (cachedModEntriesSize.get() >= config.getModsCacheSizeLimitPerFI()) { + if (cachedModEntriesSize.get() >= CONFIG.getModsCacheSizeLimitPerFI()) { return null; } long memCost = @@ -361,7 +364,7 @@ public class FragmentInstanceContext extends QueryContext { + RamUsageEstimator.SHALLOW_SIZE_OF_CONCURRENT_HASHMAP_ENTRY; long alreadyUsedMemoryForCachedModEntries = cachedModEntriesSize.get(); while (alreadyUsedMemoryForCachedModEntries + memCost - < config.getModsCacheSizeLimitPerFI()) { + < CONFIG.getModsCacheSizeLimitPerFI()) { if (cachedModEntriesSize.compareAndSet( alreadyUsedMemoryForCachedModEntries, alreadyUsedMemoryForCachedModEntries + memCost)) { @@ -476,10 +479,6 @@ public class FragmentInstanceContext extends QueryContext { return dataNodeQueryContext; } - public void setDataNodeQueryContext(DataNodeQueryContext dataNodeQueryContext) { - this.dataNodeQueryContext = dataNodeQueryContext; - } - public FragmentInstanceInfo getInstanceInfo() { return getErrorCode() .map( @@ -535,11 +534,11 @@ public class FragmentInstanceContext extends QueryContext { memoryReservationManager.releaseAllReservedMemory(); } - public void initQueryDataSource(List<PartialPath> sourcePaths) throws QueryProcessException { + public boolean initQueryDataSource(List<PartialPath> sourcePaths) throws QueryProcessException { long startTime = System.nanoTime(); if (sourcePaths == null || sourcePaths.isEmpty()) { this.sharedQueryDataSource = EMPTY_QUERY_DATA_SOURCE; - return; + return true; } String singleDevice = null; if (sourcePaths.size() == 1) { @@ -560,82 +559,136 @@ public class FragmentInstanceContext extends QueryContext { } } } - dataRegion.readLock(); - try { - this.sharedQueryDataSource = - dataRegion.query( - sourcePaths, - // when all the selected series are under the same device, the QueryDataSource will be - // filtered according to timeIndex - singleDevice, - this, - // time filter may be stateful, so we need to copy it - globalTimeFilter != null ? globalTimeFilter.copy() : null, - timePartitions); - - // used files should be added before mergeLock is unlocked, or they may be deleted by - // running merge - if (sharedQueryDataSource != null) { - closedFilePaths = new HashSet<>(); - unClosedFilePaths = new HashSet<>(); - addUsedFilesForQuery((QueryDataSource) sharedQueryDataSource); - ((QueryDataSource) sharedQueryDataSource).setSingleDevice(singleDevice != null); + + long waitForLockTime = CONFIG.getDriverTaskExecutionTimeSliceInMs(); + long startAcquireLockTime = System.nanoTime(); + if (dataRegion.tryReadLock(waitForLockTime)) { + try { + // minus already consumed time + waitForLockTime -= (System.nanoTime() - startAcquireLockTime) / 1_000_000; + + // no remaining time slice + if (waitForLockTime <= 0) { + return false; + } + + this.sharedQueryDataSource = + dataRegion.query( + sourcePaths, + // when all the selected series are under the same device, the QueryDataSource will + // be + // filtered according to timeIndex + singleDevice, + this, + // time filter may be stateful, so we need to copy it + globalTimeFilter != null ? globalTimeFilter.copy() : null, + timePartitions, + waitForLockTime); + + // used files should be added before mergeLock is unlocked, or they may be deleted by + // running merge + if (sharedQueryDataSource != null) { + closedFilePaths = new HashSet<>(); + unClosedFilePaths = new HashSet<>(); + addUsedFilesForQuery((QueryDataSource) sharedQueryDataSource); + ((QueryDataSource) sharedQueryDataSource).setSingleDevice(singleDevice != null); + return true; + } else { + // failed to acquire lock within the specific time + return false; + } + } finally { + addInitQueryDataSourceCost(System.nanoTime() - startTime); + dataRegion.readUnlock(); } - } finally { - setInitQueryDataSourceCost(System.nanoTime() - startTime); - dataRegion.readUnlock(); + } else { + addInitQueryDataSourceCost(System.nanoTime() - startTime); + return false; } } - public void initRegionScanQueryDataSource(Map<IDeviceID, DeviceContext> devicePathsToContext) - throws QueryProcessException { + public boolean initRegionScanQueryDataSource(Map<IDeviceID, DeviceContext> devicePathsToContext) { long startTime = System.nanoTime(); if (devicePathsToContext == null) { - return; + return true; } - dataRegion.readLock(); - try { - this.sharedQueryDataSource = - dataRegion.queryForDeviceRegionScan( - devicePathsToContext, - this, - globalTimeFilter != null ? globalTimeFilter.copy() : null, - timePartitions); - - if (sharedQueryDataSource != null) { - closedFilePaths = new HashSet<>(); - unClosedFilePaths = new HashSet<>(); - addUsedFilesForRegionQuery((QueryDataSourceForRegionScan) sharedQueryDataSource); + + long waitForLockTime = CONFIG.getDriverTaskExecutionTimeSliceInMs(); + if (dataRegion.tryReadLock(waitForLockTime)) { + try { + // minus already consumed time + waitForLockTime -= (System.nanoTime() - startTime) / 1_000_000; + + // no remaining time slice + if (waitForLockTime <= 0) { + return false; + } + this.sharedQueryDataSource = + dataRegion.queryForDeviceRegionScan( + devicePathsToContext, + this, + globalTimeFilter != null ? globalTimeFilter.copy() : null, + timePartitions, + waitForLockTime); + + if (sharedQueryDataSource != null) { + closedFilePaths = new HashSet<>(); + unClosedFilePaths = new HashSet<>(); + addUsedFilesForRegionQuery((QueryDataSourceForRegionScan) sharedQueryDataSource); + return true; + } else { + // failed to acquire lock within the specific time + return false; + } + } finally { + addInitQueryDataSourceCost(System.nanoTime() - startTime); + dataRegion.readUnlock(); } - } finally { - setInitQueryDataSourceCost(System.nanoTime() - startTime); - dataRegion.readUnlock(); + } else { + addInitQueryDataSourceCost(System.nanoTime() - startTime); + return false; } } - public void initRegionScanQueryDataSource(List<PartialPath> pathList) - throws QueryProcessException { + public boolean initRegionScanQueryDataSource(List<PartialPath> pathList) { long startTime = System.nanoTime(); if (pathList == null) { - return; + return true; } - dataRegion.readLock(); - try { - this.sharedQueryDataSource = - dataRegion.queryForSeriesRegionScan( - pathList, - this, - globalTimeFilter != null ? globalTimeFilter.copy() : null, - timePartitions); - - if (sharedQueryDataSource != null) { - closedFilePaths = new HashSet<>(); - unClosedFilePaths = new HashSet<>(); - addUsedFilesForRegionQuery((QueryDataSourceForRegionScan) sharedQueryDataSource); + long waitForLockTime = CONFIG.getDriverTaskExecutionTimeSliceInMs(); + if (dataRegion.tryReadLock(waitForLockTime)) { + // minus already consumed time + waitForLockTime -= (System.nanoTime() - startTime) / 1_000_000; + + // no remaining time slice + if (waitForLockTime <= 0) { + return false; + } + try { + this.sharedQueryDataSource = + dataRegion.queryForSeriesRegionScan( + pathList, + this, + globalTimeFilter != null ? globalTimeFilter.copy() : null, + timePartitions, + waitForLockTime); + + if (sharedQueryDataSource != null) { + closedFilePaths = new HashSet<>(); + unClosedFilePaths = new HashSet<>(); + addUsedFilesForRegionQuery((QueryDataSourceForRegionScan) sharedQueryDataSource); + return true; + } else { + // failed to acquire lock within the specific time + return false; + } + } finally { + addInitQueryDataSourceCost(System.nanoTime() - startTime); + dataRegion.readUnlock(); } - } finally { - setInitQueryDataSourceCost(System.nanoTime() - startTime); - dataRegion.readUnlock(); + } else { + addInitQueryDataSourceCost(System.nanoTime() - startTime); + return false; } } @@ -643,17 +696,26 @@ public class FragmentInstanceContext extends QueryContext { if (sharedQueryDataSource == null) { switch (queryDataSourceType) { case SERIES_SCAN: - initQueryDataSource(sourcePaths); - // Friendly for gc - sourcePaths = null; + if (initQueryDataSource(sourcePaths)) { + // Friendly for gc + sourcePaths = null; + } else { + return getUnfinishedQueryDataSource(); + } break; case DEVICE_REGION_SCAN: - initRegionScanQueryDataSource(devicePathsToContext); - devicePathsToContext = null; + if (initRegionScanQueryDataSource(devicePathsToContext)) { + devicePathsToContext = null; + } else { + return getUnfinishedQueryDataSource(); + } break; case TIME_SERIES_REGION_SCAN: - initRegionScanQueryDataSource(sourcePaths); - sourcePaths = null; + if (initRegionScanQueryDataSource(sourcePaths)) { + sourcePaths = null; + } else { + return getUnfinishedQueryDataSource(); + } break; default: throw new QueryProcessException( @@ -663,6 +725,18 @@ public class FragmentInstanceContext extends QueryContext { return sharedQueryDataSource; } + private IQueryDataSource getUnfinishedQueryDataSource() { + increaseInitQueryDataSourceRetryCount(); + // record warn log every 10 times retry + if (initQueryDataSourceRetryCount % 10 == 0) { + LOGGER.warn( + "Failed to acquire the read lock of DataRegion-{} for {} times", + dataRegion == null ? "UNKNOWN" : dataRegion.getDataRegionId(), + initQueryDataSourceRetryCount); + } + return UNFINISHED_QUERY_DATA_SOURCE; + } + /** Lock and check if tsFileResource is deleted */ private boolean processTsFileResource(TsFileResource tsFileResource, boolean isClosed) { addFilePathToMap(tsFileResource, isClosed); @@ -840,6 +914,9 @@ public class FragmentInstanceContext extends QueryContext { QueryRelatedResourceMetricSet.getInstance().updateFragmentInstanceTime(durationTime); + QueryResourceMetricSet.getInstance() + .recordInitQueryResourceRetryCount(getInitQueryDataSourceRetryCount()); + SeriesScanCostMetricSet.getInstance() .recordBloomFilterMetrics( getQueryStatistics().getLoadBloomFilterFromCacheCount().get(), @@ -921,7 +998,7 @@ public class FragmentInstanceContext extends QueryContext { .updatePageReaderMemoryUsage(getQueryStatistics().getPageReaderMaxUsedMemorySize().get()); } - private synchronized void releaseDataNodeQueryContext() { + private void releaseDataNodeQueryContext() { if (dataNodeQueryContextMap == null) { // this process is in fetch schema, nothing need to release return; @@ -958,14 +1035,22 @@ public class FragmentInstanceContext extends QueryContext { return fragmentInstanceStatistics; } - public void setInitQueryDataSourceCost(long initQueryDataSourceCost) { - this.initQueryDataSourceCost = initQueryDataSourceCost; + public void addInitQueryDataSourceCost(long initQueryDataSourceCost) { + this.initQueryDataSourceCost += initQueryDataSourceCost; } public long getInitQueryDataSourceCost() { return initQueryDataSourceCost; } + public void increaseInitQueryDataSourceRetryCount() { + this.initQueryDataSourceRetryCount++; + } + + public int getInitQueryDataSourceRetryCount() { + return initQueryDataSourceRetryCount; + } + public void addReadyQueuedTime(long time) { readyQueueTime.addAndGet(time); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java index 9a825a055c4..d8ce02aa3fc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.execution.fragment; import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.exception.CpuNotEnoughException; @@ -29,7 +30,6 @@ import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler; -import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery; import org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion; import org.apache.iotdb.db.utils.SetThreadName; @@ -53,6 +53,7 @@ import static org.apache.iotdb.db.queryengine.statistics.StatisticsMergeUtil.mer public class FragmentInstanceExecution { private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstanceExecution.class); + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); private final FragmentInstanceId instanceId; private final FragmentInstanceContext context; @@ -167,8 +168,8 @@ public class FragmentInstanceExecution { // We don't need to output the region having ExplainAnalyzeOperator only. return false; } - statistics.setDataRegion(((DataRegion) context.getDataRegion()).getDataRegionId()); - statistics.setIp(IoTDBDescriptor.getInstance().getConfig().getAddressAndPort().ip); + statistics.setDataRegion(context.getDataRegion().getDataRegionId()); + statistics.setIp(CONFIG.getInternalAddress() + ":" + CONFIG.getInternalPort()); statistics.setStartTimeInMS(context.getStartTime()); statistics.setEndTimeInMS( context.isEndTimeUpdate() ? context.getEndTime() : System.currentTimeMillis()); @@ -177,6 +178,7 @@ public class FragmentInstanceExecution { statistics.setReadyQueuedTime(context.getReadyQueueTime()); statistics.setInitDataQuerySourceCost(context.getInitQueryDataSourceCost()); + statistics.setInitDataQuerySourceRetryCount(context.getInitQueryDataSourceRetryCount()); statistics.setSeqClosednNum(context.getClosedSeqFileNum()); statistics.setSeqUnclosedNum(context.getUnclosedSeqFileNum()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/QueryResourceMetricSet.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/QueryResourceMetricSet.java index 401fc2e0c8f..241486750a4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/QueryResourceMetricSet.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/QueryResourceMetricSet.java @@ -41,10 +41,12 @@ public class QueryResourceMetricSet implements IMetricSet { public static final String UNSEQUENCE_TSFILE = "unsequence_tsfile"; public static final String FLUSHING_MEMTABLE = "flushing_memtable"; public static final String WORKING_MEMTABLE = "working_memtable"; + public static final String INIT_QUERY_RESOURCE_RETRY_COUNT = "retry_count"; private Histogram sequenceTsFileHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; private Histogram unsequenceTsFileHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; private Histogram flushingMemTableHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; private Histogram workingMemTableHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; + private Histogram retryCountHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; public void recordQueryResourceNum(String type, int count) { switch (type) { @@ -60,11 +62,22 @@ public class QueryResourceMetricSet implements IMetricSet { case WORKING_MEMTABLE: workingMemTableHistogram.update(count); break; + case INIT_QUERY_RESOURCE_RETRY_COUNT: + if (count > 0) { + retryCountHistogram.update(count); + } + break; default: break; } } + public void recordInitQueryResourceRetryCount(int count) { + if (count > 0) { + retryCountHistogram.update(count); + } + } + @Override public void bindTo(AbstractMetricService metricService) { sequenceTsFileHistogram = @@ -91,11 +104,22 @@ public class QueryResourceMetricSet implements IMetricSet { MetricLevel.IMPORTANT, Tag.TYPE.toString(), WORKING_MEMTABLE); + retryCountHistogram = + metricService.getOrCreateHistogram( + Metric.QUERY_RESOURCE.toString(), + MetricLevel.IMPORTANT, + Tag.TYPE.toString(), + INIT_QUERY_RESOURCE_RETRY_COUNT); } @Override public void unbindFrom(AbstractMetricService metricService) { - Arrays.asList(SEQUENCE_TSFILE, UNSEQUENCE_TSFILE, FLUSHING_MEMTABLE, WORKING_MEMTABLE) + Arrays.asList( + SEQUENCE_TSFILE, + UNSEQUENCE_TSFILE, + FLUSHING_MEMTABLE, + WORKING_MEMTABLE, + INIT_QUERY_RESOURCE_RETRY_COUNT) .forEach( type -> metricService.remove( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FragmentInstanceStatisticsDrawer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FragmentInstanceStatisticsDrawer.java index 0141a86e627..b9f1a55b5bd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FragmentInstanceStatisticsDrawer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FragmentInstanceStatisticsDrawer.java @@ -113,6 +113,17 @@ public class FragmentInstanceStatisticsDrawer { String.format( "Cost of initDataQuerySource: %.3f ms", statistics.getInitDataQuerySourceCost() * NS_TO_MS_FACTOR)); + + if (statistics.isSetInitDataQuerySourceRetryCount() + && statistics.getInitDataQuerySourceRetryCount() > 0) { + addLine( + singleFragmentInstanceArea, + 1, + String.format( + "Retry count of initDataQuerySource: %d", + statistics.getInitDataQuerySourceRetryCount())); + } + addLine( singleFragmentInstanceArea, 1, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index a04c81f4801..b68a7199436 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -234,11 +234,6 @@ public class DataRegion implements IDataRegionForQuery { /** closeStorageGroupCondition is used to wait for all currently closing TsFiles to be done. */ private final Object closeStorageGroupCondition = new Object(); - /** - * Avoid some tsfileResource is changed (e.g., from unsealed to sealed) when a read is executed. - */ - private final ReadWriteLock closeQueryLock = new ReentrantReadWriteLock(); - /** time partition id in the database -> {@link TsFileProcessor} for this time partition. */ private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors = new TreeMap<>(); @@ -1956,8 +1951,8 @@ public class DataRegion implements IDataRegionForQuery { } } - /** used for query engine */ @Override + @TestOnly public QueryDataSource query( List<PartialPath> pathList, String singleDeviceId, @@ -1965,31 +1960,154 @@ public class DataRegion implements IDataRegionForQuery { Filter globalTimeFilter, List<Long> timePartitions) throws QueryProcessException { - try { - List<TsFileResource> seqResources = - getFileResourceListForQuery( - tsFileManager.getTsFileList(true, timePartitions, globalTimeFilter), - pathList, - singleDeviceId, - context, + return query( + pathList, singleDeviceId, context, globalTimeFilter, timePartitions, Long.MAX_VALUE); + } + + /** used for query engine */ + @Override + public QueryDataSource query( + List<PartialPath> pathList, + String singleDeviceId, + QueryContext context, + Filter globalTimeFilter, + List<Long> timePartitions, + long waitForLockTimeInMs) + throws QueryProcessException { + + Pair<List<TsFileResource>, List<TsFileResource>> pair = + tsFileManager.getAllTsFileListForQuery(timePartitions, globalTimeFilter); + + List<TsFileResource> seqTsFileResouceList = pair.left; + List<TsFileResource> unSeqTsFileResouceList = pair.right; + + List<TsFileProcessor> needToUnLockList = new ArrayList<>(); + + boolean success = + tryGetFLushLock( + waitForLockTimeInMs, + singleDeviceId, + globalTimeFilter, + context.isDebug(), + seqTsFileResouceList, + unSeqTsFileResouceList, + needToUnLockList); + + if (success) { + try { + List<TsFileResource> satisfiedSeqResourceList = + getFileResourceListForQuery( + seqTsFileResouceList, pathList, singleDeviceId, context, globalTimeFilter, true); + + List<TsFileResource> satisfiedUnSeqResourceList = + getFileResourceListForQuery( + unSeqTsFileResouceList, pathList, singleDeviceId, context, globalTimeFilter, false); + + QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum( + SEQUENCE_TSFILE, satisfiedSeqResourceList.size()); + QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum( + UNSEQUENCE_TSFILE, satisfiedUnSeqResourceList.size()); + return new QueryDataSource(satisfiedSeqResourceList, satisfiedUnSeqResourceList); + } catch (MetadataException e) { + throw new QueryProcessException(e); + } finally { + clearAlreadyLockedList(needToUnLockList); + } + } else { + // means that failed to acquire lock within the specific time + return null; + } + } + + /** + * try to get flush lock for each unclosed satisfied tsfile + * + * @return true if lock successfully, otherwise false if return false, needToUnLockList will + * always be empty because this method is responsible for unlocking all the already-acquiring + * lock if return true, the caller is responsible for unlocking all the already-acquiring lock + * in needToUnLockList + */ + private boolean tryGetFLushLock( + long waitTimeInMs, + String singleDeviceId, + Filter globalTimeFilter, + boolean isDebug, + List<TsFileResource> seqResources, + List<TsFileResource> unSeqResources, + List<TsFileProcessor> needToUnLockList) { + // deal with seq resources + for (TsFileResource tsFileResource : seqResources) { + // only need to acquire flush lock for those unclosed and satisfied tsfile + if (!tsFileResource.isClosed() + && tsFileResource.isSatisfied( + singleDeviceId == null ? null : new PlainDeviceID(singleDeviceId), globalTimeFilter, - true); - List<TsFileResource> unseqResources = - getFileResourceListForQuery( - tsFileManager.getTsFileList(false, timePartitions, globalTimeFilter), - pathList, - singleDeviceId, - context, + true, + isDebug)) { + TsFileProcessor tsFileProcessor = tsFileResource.getProcessor(); + try { + long startTime = System.nanoTime(); + if (tsFileProcessor.tryReadLock(waitTimeInMs)) { + // minus already consumed time + waitTimeInMs -= (System.nanoTime() - startTime) / 1_000_000; + + needToUnLockList.add(tsFileProcessor); + // no remaining time slice + if (waitTimeInMs <= 0) { + clearAlreadyLockedList(needToUnLockList); + return false; + } + } else { + clearAlreadyLockedList(needToUnLockList); + return false; + } + } catch (InterruptedException e) { + clearAlreadyLockedList(needToUnLockList); + Thread.currentThread().interrupt(); + return false; + } + } + } + // deal with unSeq resources + for (TsFileResource tsFileResource : unSeqResources) { + if (!tsFileResource.isClosed() + && tsFileResource.isSatisfied( + singleDeviceId == null ? null : new PlainDeviceID(singleDeviceId), globalTimeFilter, - false); - - QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(SEQUENCE_TSFILE, seqResources.size()); - QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(UNSEQUENCE_TSFILE, unseqResources.size()); + false, + isDebug)) { + TsFileProcessor tsFileProcessor = tsFileResource.getProcessor(); + try { + long startTime = System.nanoTime(); + if (tsFileProcessor.tryReadLock(waitTimeInMs)) { + // minus already consumed time + waitTimeInMs -= (System.nanoTime() - startTime) / 1_000_000; + + needToUnLockList.add(tsFileProcessor); + // no remaining time slice + if (waitTimeInMs <= 0) { + clearAlreadyLockedList(needToUnLockList); + return false; + } + } else { + clearAlreadyLockedList(needToUnLockList); + return false; + } + } catch (InterruptedException e) { + clearAlreadyLockedList(needToUnLockList); + Thread.currentThread().interrupt(); + return false; + } + } + } + return true; + } - return new QueryDataSource(seqResources, unseqResources); - } catch (MetadataException e) { - throw new QueryProcessException(e); + private void clearAlreadyLockedList(List<TsFileProcessor> needToUnLockList) { + for (TsFileProcessor processor : needToUnLockList) { + processor.readUnLock(); } + needToUnLockList.clear(); } @Override @@ -1997,31 +2115,51 @@ public class DataRegion implements IDataRegionForQuery { List<PartialPath> pathList, QueryContext queryContext, Filter globalTimeFilter, - List<Long> timePartitions) - throws QueryProcessException { - try { - List<IFileScanHandle> seqFileScanHandles = - getFileHandleListForQuery( - tsFileManager.getTsFileList(true, timePartitions, globalTimeFilter), - pathList, - queryContext, - globalTimeFilter, - true); - List<IFileScanHandle> unseqFileScanHandles = - getFileHandleListForQuery( - tsFileManager.getTsFileList(false, timePartitions, globalTimeFilter), - pathList, - queryContext, - globalTimeFilter, - false); - - QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(SEQUENCE_TSFILE, seqFileScanHandles.size()); - QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum( - UNSEQUENCE_TSFILE, unseqFileScanHandles.size()); + List<Long> timePartitions, + long waitForLockTimeInMs) { + Pair<List<TsFileResource>, List<TsFileResource>> pair = + tsFileManager.getAllTsFileListForQuery(timePartitions, globalTimeFilter); + + List<TsFileResource> seqTsFileResouceList = pair.left; + List<TsFileResource> unSeqTsFileResouceList = pair.right; + + List<TsFileProcessor> needToUnLockList = new ArrayList<>(); + + boolean success = + tryGetFLushLock( + waitForLockTimeInMs, + null, + globalTimeFilter, + queryContext.isDebug(), + seqTsFileResouceList, + unSeqTsFileResouceList, + needToUnLockList); + + if (success) { + try { + List<IFileScanHandle> seqFileScanHandles = + getFileHandleListForQuery( + seqTsFileResouceList, pathList, queryContext, globalTimeFilter, true); + + List<IFileScanHandle> unSeqFileScanHandles = + getFileHandleListForQuery( + tsFileManager.getTsFileList(false, timePartitions, globalTimeFilter), + pathList, + queryContext, + globalTimeFilter, + false); - return new QueryDataSourceForRegionScan(seqFileScanHandles, unseqFileScanHandles); - } catch (MetadataException e) { - throw new QueryProcessException(e); + QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum( + SEQUENCE_TSFILE, seqFileScanHandles.size()); + QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum( + UNSEQUENCE_TSFILE, unSeqFileScanHandles.size()); + return new QueryDataSourceForRegionScan(seqFileScanHandles, unSeqFileScanHandles); + } finally { + clearAlreadyLockedList(needToUnLockList); + } + } else { + // means that failed to acquire lock within the specific time + return null; } } @@ -2030,25 +2168,19 @@ public class DataRegion implements IDataRegionForQuery { List<PartialPath> partialPaths, QueryContext context, Filter globalTimeFilter, - boolean isSeq) - throws MetadataException { + boolean isSeq) { List<IFileScanHandle> fileScanHandles = new ArrayList<>(); for (TsFileResource tsFileResource : tsFileResources) { if (!tsFileResource.isSatisfied(null, globalTimeFilter, isSeq, context.isDebug())) { continue; } - closeQueryLock.readLock().lock(); - try { - if (tsFileResource.isClosed()) { - fileScanHandles.add(new ClosedFileScanHandleImpl(tsFileResource, context)); - } else { - tsFileResource - .getProcessor() - .queryForSeriesRegionScan(partialPaths, context, fileScanHandles); - } - } finally { - closeQueryLock.readLock().unlock(); + if (tsFileResource.isClosed()) { + fileScanHandles.add(new ClosedFileScanHandleImpl(tsFileResource, context)); + } else { + tsFileResource + .getProcessor() + .queryForSeriesRegionScanWithoutLock(partialPaths, context, fileScanHandles); } } return fileScanHandles; @@ -2059,31 +2191,52 @@ public class DataRegion implements IDataRegionForQuery { Map<IDeviceID, DeviceContext> devicePathToAligned, QueryContext queryContext, Filter globalTimeFilter, - List<Long> timePartitions) - throws QueryProcessException { - try { - List<IFileScanHandle> seqFileScanHandles = - getFileHandleListForQuery( - tsFileManager.getTsFileList(true, timePartitions, globalTimeFilter), - devicePathToAligned, - queryContext, - globalTimeFilter, - true); - List<IFileScanHandle> unseqFileScanHandles = - getFileHandleListForQuery( - tsFileManager.getTsFileList(false, timePartitions, globalTimeFilter), - devicePathToAligned, - queryContext, - globalTimeFilter, - false); + List<Long> timePartitions, + long waitForLockTimeInMs) { + + Pair<List<TsFileResource>, List<TsFileResource>> pair = + tsFileManager.getAllTsFileListForQuery(timePartitions, globalTimeFilter); + + List<TsFileResource> seqTsFileResouceList = pair.left; + List<TsFileResource> unSeqTsFileResouceList = pair.right; + + List<TsFileProcessor> needToUnLockList = new ArrayList<>(); - QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(SEQUENCE_TSFILE, seqFileScanHandles.size()); - QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum( - UNSEQUENCE_TSFILE, unseqFileScanHandles.size()); + boolean success = + tryGetFLushLock( + waitForLockTimeInMs, + null, + globalTimeFilter, + queryContext.isDebug(), + seqTsFileResouceList, + unSeqTsFileResouceList, + needToUnLockList); - return new QueryDataSourceForRegionScan(seqFileScanHandles, unseqFileScanHandles); - } catch (MetadataException e) { - throw new QueryProcessException(e); + if (success) { + try { + List<IFileScanHandle> seqFileScanHandles = + getFileHandleListForQuery( + seqTsFileResouceList, devicePathToAligned, queryContext, globalTimeFilter, true); + + List<IFileScanHandle> unSeqFileScanHandles = + getFileHandleListForQuery( + tsFileManager.getTsFileList(false, timePartitions, globalTimeFilter), + devicePathToAligned, + queryContext, + globalTimeFilter, + false); + + QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum( + SEQUENCE_TSFILE, seqFileScanHandles.size()); + QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum( + UNSEQUENCE_TSFILE, unSeqFileScanHandles.size()); + return new QueryDataSourceForRegionScan(seqFileScanHandles, unSeqFileScanHandles); + } finally { + clearAlreadyLockedList(needToUnLockList); + } + } else { + // means that failed to acquire lock within the specific time + return null; } } @@ -2092,25 +2245,19 @@ public class DataRegion implements IDataRegionForQuery { Map<IDeviceID, DeviceContext> devicePathsToContext, QueryContext context, Filter globalTimeFilter, - boolean isSeq) - throws MetadataException { + boolean isSeq) { List<IFileScanHandle> fileScanHandles = new ArrayList<>(); for (TsFileResource tsFileResource : tsFileResources) { if (!tsFileResource.isSatisfied(null, globalTimeFilter, isSeq, context.isDebug())) { continue; } - closeQueryLock.readLock().lock(); - try { - if (tsFileResource.isClosed()) { - fileScanHandles.add(new ClosedFileScanHandleImpl(tsFileResource, context)); - } else { - tsFileResource - .getProcessor() - .queryForDeviceRegionScan(devicePathsToContext, context, fileScanHandles); - } - } finally { - closeQueryLock.readLock().unlock(); + if (tsFileResource.isClosed()) { + fileScanHandles.add(new ClosedFileScanHandleImpl(tsFileResource, context)); + } else { + tsFileResource + .getProcessor() + .queryForDeviceRegionScanWithoutLock(devicePathsToContext, context, fileScanHandles); } } return fileScanHandles; @@ -2118,11 +2265,45 @@ public class DataRegion implements IDataRegionForQuery { /** lock the read lock of the insert lock */ @Override - public void readLock() { - // apply read lock for SG insert lock to prevent inconsistent with concurrently writing memtable - insertLock.readLock().lock(); + public boolean tryReadLock(long waitMillis) { + try { + // apply read lock for SG insert lock to prevent inconsistent with concurrently writing + // memtable + long startTime = System.nanoTime(); + if (insertLock.readLock().tryLock(waitMillis, TimeUnit.MILLISECONDS)) { + // minus already consumed time + waitMillis -= (System.nanoTime() - startTime) / 1_000_000; + // no remaining time slice + if (waitMillis <= 0) { + insertLock.readLock().unlock(); + return false; + } + return tryGetTsFileManagerReadLock(waitMillis); + } else { + return false; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + + private boolean tryGetTsFileManagerReadLock(long waitMillis) { // apply read lock for TsFileResource list - tsFileManager.readLock(); + try { + if (tsFileManager.tryReadLock(waitMillis)) { + return true; + } else { + // failed to acquire tsFileManager read lock, we also need to unlock the insertLock + insertLock.readLock().unlock(); + return false; + } + } catch (InterruptedException e) { + // failed to acquire tsFileManager read lock, we also need to unlock the insertLock + insertLock.readLock().unlock(); + Thread.currentThread().interrupt(); + return false; + } } /** unlock the read lock of insert lock */ @@ -2157,15 +2338,6 @@ public class DataRegion implements IDataRegionForQuery { boolean isSeq) throws MetadataException { - if (context.isDebug()) { - DEBUG_LOGGER.info( - "Path: {}, get tsfile list: {} isSeq: {} time filter: {}", - pathList, - tsFileResources, - isSeq, - (globalTimeFilter == null ? "null" : globalTimeFilter)); - } - List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>(); for (TsFileResource tsFileResource : tsFileResources) { @@ -2176,19 +2348,16 @@ public class DataRegion implements IDataRegionForQuery { context.isDebug())) { continue; } - closeQueryLock.readLock().lock(); try { if (tsFileResource.isClosed()) { tsfileResourcesForQuery.add(tsFileResource); } else { tsFileResource .getProcessor() - .query(pathList, context, tsfileResourcesForQuery, globalTimeFilter); + .queryWithoutLock(pathList, context, tsfileResourcesForQuery, globalTimeFilter); } } catch (IOException e) { throw new MetadataException(e); - } finally { - closeQueryLock.readLock().unlock(); } } return tsfileResourcesForQuery; @@ -2660,9 +2829,9 @@ public class DataRegion implements IDataRegionForQuery { isValidateTsFileFailed = !TsFileValidator.getInstance().validateTsFile(tsFileProcessor.getTsFileResource()); } - closeQueryLock.writeLock().lock(); + tsFileProcessor.writeLock(); try { - tsFileProcessor.close(); + tsFileProcessor.closeWithoutSettingResourceStatus(); if (isEmptyFile) { tsFileProcessor.getTsFileResource().remove(); } else if (isValidateTsFileFailed) { @@ -2671,10 +2840,11 @@ public class DataRegion implements IDataRegionForQuery { renameAndHandleError( tsFilePath + RESOURCE_SUFFIX, tsFilePath + RESOURCE_SUFFIX + BROKEN_SUFFIX); } else { + tsFileProcessor.getTsFileResource().setStatus(TsFileResourceStatus.NORMAL); tsFileResourceManager.registerSealedTsFileResource(tsFileProcessor.getTsFileResource()); } } finally { - closeQueryLock.writeLock().unlock(); + tsFileProcessor.writeUnlock(); } if (isEmptyFile || isValidateTsFileFailed) { tsFileManager.remove(tsFileProcessor.getTsFileResource(), tsFileProcessor.isSequence()); @@ -3424,6 +3594,7 @@ public class DataRegion implements IDataRegionForQuery { return tsFileManager.getTsFileList(false); } + @Override public String getDataRegionId() { return dataRegionId; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java index c18262145e5..14c6db513fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.queryengine.common.DeviceContext; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; @@ -34,12 +35,12 @@ import java.util.Map; /** It's an interface that storage engine must provide for query engine */ public interface IDataRegionForQuery { - /** lock the read lock for thread-safe */ - void readLock(); + boolean tryReadLock(long waitMillis); void readUnlock(); /** Get satisfied QueryDataSource from DataRegion for seriesScan */ + @TestOnly QueryDataSource query( List<PartialPath> pathList, String singleDeviceId, @@ -48,21 +49,37 @@ public interface IDataRegionForQuery { List<Long> timePartitions) throws QueryProcessException; + /** + * Get satisfied QueryDataSource from DataRegion for seriesScan + * + * @return null means that failed to acquire lock within the specific time + */ + QueryDataSource query( + List<PartialPath> pathList, + String singleDeviceId, + QueryContext context, + Filter globalTimeFilter, + List<Long> timePartitions, + long waitForLockTimeInMs) + throws QueryProcessException; + /** Get satisfied QueryDataSource from DataRegion for regionScan */ IQueryDataSource queryForDeviceRegionScan( Map<IDeviceID, DeviceContext> devicePathsToContext, QueryContext queryContext, Filter globalTimeFilter, - List<Long> timePartitions) - throws QueryProcessException; + List<Long> timePartitions, + long waitForLockTimeInMs); IQueryDataSource queryForSeriesRegionScan( List<PartialPath> pathList, QueryContext queryContext, Filter globalTimeFilter, - List<Long> timePartitions) - throws QueryProcessException; + List<Long> timePartitions, + long waitForLockTimeInMs); /** Get database name of this DataRegion */ String getDatabaseName(); + + String getDataRegionId(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java index c71111324c4..9e2f53b6b18 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java @@ -41,9 +41,14 @@ public class VirtualDataRegion implements IDataRegionForQuery { private static final String VIRTUAL_DB_NAME = "root.__virtual"; + private static final String VIRTUAL_DATA_REGION_ID = "virtual_data_region"; + public static final QueryDataSource EMPTY_QUERY_DATA_SOURCE = new QueryDataSource(Collections.emptyList(), Collections.emptyList()); + public static final QueryDataSource UNFINISHED_QUERY_DATA_SOURCE = + new QueryDataSource(Collections.emptyList(), Collections.emptyList()); + private static final QueryDataSourceForRegionScan EMPTY_REGION_QUERY_DATA_SOURCE = new QueryDataSourceForRegionScan(Collections.emptyList(), Collections.emptyList()); @@ -52,8 +57,9 @@ public class VirtualDataRegion implements IDataRegionForQuery { } @Override - public void readLock() { - // do nothing, because itself is thread-safe already + public boolean tryReadLock(long waitMillis) { + // do nothing, always return true + return true; } @Override @@ -69,6 +75,19 @@ public class VirtualDataRegion implements IDataRegionForQuery { Filter globalTimeFilter, List<Long> timePartitions) throws QueryProcessException { + return query( + pathList, singleDeviceId, context, globalTimeFilter, timePartitions, Long.MAX_VALUE); + } + + @Override + public QueryDataSource query( + List<PartialPath> pathList, + String singleDeviceId, + QueryContext context, + Filter globalTimeFilter, + List<Long> timePartitions, + long waitForLockTimeInMs) + throws QueryProcessException { return EMPTY_QUERY_DATA_SOURCE; } @@ -77,8 +96,8 @@ public class VirtualDataRegion implements IDataRegionForQuery { Map<IDeviceID, DeviceContext> devicePathsToContext, QueryContext queryContext, Filter globalTimeFilter, - List<Long> timePartitions) - throws QueryProcessException { + List<Long> timePartitions, + long waitForLockTimeInMs) { return EMPTY_REGION_QUERY_DATA_SOURCE; } @@ -87,8 +106,8 @@ public class VirtualDataRegion implements IDataRegionForQuery { List<PartialPath> pathList, QueryContext queryContext, Filter globalTimeFilter, - List<Long> timePartitions) - throws QueryProcessException { + List<Long> timePartitions, + long waitForLockTimeInMs) { return EMPTY_REGION_QUERY_DATA_SOURCE; } @@ -97,6 +116,11 @@ public class VirtualDataRegion implements IDataRegionForQuery { return VIRTUAL_DB_NAME; } + @Override + public String getDataRegionId() { + return VIRTUAL_DATA_REGION_ID; + } + private static class InstanceHolder { private InstanceHolder() {} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java index f797613b078..58aabe4a5ac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java @@ -32,12 +32,16 @@ import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.write.chunk.IChunkWriter; import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Iterator; import java.util.List; import java.util.concurrent.BlockingQueue; public abstract class AbstractWritableMemChunk implements IWritableMemChunk { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractWritableMemChunk.class); + protected static long RETRY_INTERVAL_MS = 100L; protected static long MAX_WAIT_QUERY_MS = 60 * 1000L; @@ -53,11 +57,18 @@ public abstract class AbstractWritableMemChunk implements IWritableMemChunk { protected void maybeReleaseTvList(TVList tvList) { long startTimeInMs = System.currentTimeMillis(); boolean succeed = false; + int retryCount = 0; while (!succeed) { try { tryReleaseTvList(tvList); succeed = true; } catch (MemoryNotEnoughException ex) { + // print log every 5 seconds + if (retryCount % 50 == 0) { + LOGGER.warn( + "Failed to transfer tvlist memory owner to query engine, {}", ex.getMessage()); + } + retryCount++; long waitQueryInMs = System.currentTimeMillis() - startTimeInMs; if (waitQueryInMs > MAX_WAIT_QUERY_MS) { // Abort first query in the list. When all queries in the list have been aborted, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 34070a848c9..2d58015a596 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -1633,10 +1633,10 @@ public class TsFileProcessor { } /** Close this tsfile */ - public void close() throws TsFileProcessorException { + public void closeWithoutSettingResourceStatus() throws TsFileProcessorException { try { // When closing resource file, its corresponding mod file is also closed. - tsFileResource.close(); + tsFileResource.closeWithoutSettingStatus(); } catch (IOException e) { throw new TsFileProcessorException(e); } @@ -1820,7 +1820,7 @@ public class TsFileProcessor { return new ArrayList<>(alignedChunkMetadataForOneDevice); } - public void queryForSeriesRegionScan( + public void queryForSeriesRegionScanWithoutLock( List<PartialPath> pathList, QueryContext queryContext, List<IFileScanHandle> fileScanHandlesForQuery) { @@ -1829,7 +1829,6 @@ public class TsFileProcessor { Map<IDeviceID, Map<String, List<IChunkHandle>>> deviceToMemChunkHandleMap = new HashMap<>(); Map<IDeviceID, Map<String, List<IChunkMetadata>>> deviceToChunkMetadataListMap = new HashMap<>(); - flushQueryLock.readLock().lock(); try { for (PartialPath seriesPath : pathList) { Map<String, List<IChunkMetadata>> measurementToChunkMetaList = new HashMap<>(); @@ -1884,9 +1883,6 @@ public class TsFileProcessor { QUERY_RESOURCE_METRICS.recordQueryResourceNum(FLUSHING_MEMTABLE, flushingMemTables.size()); QUERY_RESOURCE_METRICS.recordQueryResourceNum( WORKING_MEMTABLE, workMemTable != null ? 1 : 0); - - flushQueryLock.readLock().unlock(); - logFlushQueryReadUnlocked(); } if (!deviceToMemChunkHandleMap.isEmpty() || !deviceToChunkMetadataListMap.isEmpty()) { fileScanHandlesForQuery.add( @@ -1903,7 +1899,7 @@ public class TsFileProcessor { * Construct IFileScanHandle for data in memtable and the other ones in flushing memtables. Then * get the related ChunkMetadata of data on disk. */ - public void queryForDeviceRegionScan( + public void queryForDeviceRegionScanWithoutLock( Map<IDeviceID, DeviceContext> devicePathsToContext, QueryContext queryContext, List<IFileScanHandle> fileScanHandlesForQuery) { @@ -1912,7 +1908,6 @@ public class TsFileProcessor { Map<IDeviceID, Map<String, List<IChunkHandle>>> deviceToMemChunkHandleMap = new HashMap<>(); Map<IDeviceID, Map<String, List<IChunkMetadata>>> deviceToChunkMetadataListMap = new HashMap<>(); - flushQueryLock.readLock().lock(); try { for (Map.Entry<IDeviceID, DeviceContext> entry : devicePathsToContext.entrySet()) { IDeviceID deviceID = entry.getKey(); @@ -1968,9 +1963,6 @@ public class TsFileProcessor { QUERY_RESOURCE_METRICS.recordQueryResourceNum(FLUSHING_MEMTABLE, flushingMemTables.size()); QUERY_RESOURCE_METRICS.recordQueryResourceNum( WORKING_MEMTABLE, workMemTable != null ? 1 : 0); - - flushQueryLock.readLock().unlock(); - logFlushQueryReadUnlocked(); } if (!deviceToMemChunkHandleMap.isEmpty() || !deviceToChunkMetadataListMap.isEmpty()) { @@ -1991,18 +1983,40 @@ public class TsFileProcessor { * * @param seriesPaths selected paths */ + public void queryWithoutLock( + List<PartialPath> seriesPaths, + QueryContext context, + List<TsFileResource> tsfileResourcesForQuery, + Filter globalTimeFilter) + throws IOException { + query(seriesPaths, context, tsfileResourcesForQuery, globalTimeFilter, false); + } + + @TestOnly public void query( List<PartialPath> seriesPaths, QueryContext context, List<TsFileResource> tsfileResourcesForQuery, Filter globalTimeFilter) throws IOException { + query(seriesPaths, context, tsfileResourcesForQuery, globalTimeFilter, true); + } + + private void query( + List<PartialPath> seriesPaths, + QueryContext context, + List<TsFileResource> tsfileResourcesForQuery, + Filter globalTimeFilter, + boolean needLock) + throws IOException { long startTime = System.nanoTime(); try { Map<PartialPath, List<IChunkMetadata>> pathToChunkMetadataListMap = new HashMap<>(); Map<PartialPath, List<ReadOnlyMemChunk>> pathToReadOnlyMemChunkMap = new HashMap<>(); - flushQueryLock.readLock().lock(); + if (needLock) { + flushQueryLock.readLock().lock(); + } try { for (PartialPath seriesPath : seriesPaths) { List<ReadOnlyMemChunk> readOnlyMemChunks = new ArrayList<>(); @@ -2047,9 +2061,10 @@ public class TsFileProcessor { QUERY_RESOURCE_METRICS.recordQueryResourceNum(FLUSHING_MEMTABLE, flushingMemTables.size()); QUERY_RESOURCE_METRICS.recordQueryResourceNum( WORKING_MEMTABLE, workMemTable != null ? 1 : 0); - - flushQueryLock.readLock().unlock(); - logFlushQueryReadUnlocked(); + if (needLock) { + flushQueryLock.readLock().unlock(); + logFlushQueryReadUnlocked(); + } } if (!pathToReadOnlyMemChunkMap.isEmpty() || !pathToChunkMetadataListMap.isEmpty()) { @@ -2159,6 +2174,22 @@ public class TsFileProcessor { return flushingMemTables; } + public void writeLock() { + flushQueryLock.writeLock().lock(); + } + + public void writeUnlock() { + flushQueryLock.writeLock().unlock(); + } + + public boolean tryReadLock(long waitInMs) throws InterruptedException { + return flushQueryLock.readLock().tryLock(waitInMs, TimeUnit.MILLISECONDS); + } + + public void readUnLock() { + flushQueryLock.readLock().unlock(); + } + private void logFlushQueryWriteLocked() { if (logger.isDebugEnabled()) { logger.debug( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java index fb7d3303576..1c831534991 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeInd import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager; import org.apache.tsfile.read.filter.basic.Filter; +import org.apache.tsfile.utils.Pair; import java.io.IOException; import java.util.ArrayList; @@ -34,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -96,6 +98,41 @@ public class TsFileManager { } } + /** + * don't need to acquire lock again, caller should guarantee the lock has been acquired + * + * @return left is seq resource list, right is unSeq resource list + */ + public Pair<List<TsFileResource>, List<TsFileResource>> getAllTsFileListForQuery( + List<Long> timePartitions, Filter timeFilter) { + List<TsFileResource> seq = new ArrayList<>(); + List<TsFileResource> unSeq = new ArrayList<>(); + if (timePartitions == null) { + for (Map.Entry<Long, TsFileResourceList> entry : sequenceFiles.entrySet()) { + if (TimePartitionUtils.satisfyTimePartition(timeFilter, entry.getKey())) { + seq.addAll(entry.getValue().getArrayList()); + } + } + for (Map.Entry<Long, TsFileResourceList> entry : unsequenceFiles.entrySet()) { + if (TimePartitionUtils.satisfyTimePartition(timeFilter, entry.getKey())) { + unSeq.addAll(entry.getValue().getArrayList()); + } + } + } else { + for (Long timePartitionId : timePartitions) { + TsFileResourceList tsFileResources = sequenceFiles.get(timePartitionId); + if (tsFileResources != null) { + seq.addAll(tsFileResources.getArrayList()); + } + tsFileResources = unsequenceFiles.get(timePartitionId); + if (tsFileResources != null) { + unSeq.addAll(tsFileResources.getArrayList()); + } + } + } + return new Pair<>(seq, unSeq); + } + public List<TsFileResource> getTsFileListSnapshot(long timePartition, boolean sequence) { readLock(); try { @@ -345,6 +382,10 @@ public class TsFileManager { resourceListLock.readLock().lock(); } + public boolean tryReadLock(long waitMillis) throws InterruptedException { + return resourceListLock.readLock().tryLock(waitMillis, TimeUnit.MILLISECONDS); + } + public void readUnlock() { resourceListLock.readLock().unlock(); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java index 4920f39a645..1f092e4c97c 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java @@ -74,6 +74,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.eq; public class DataDriverTest { @@ -91,6 +92,7 @@ public class DataDriverTest { @Before public void setUp() throws MetadataException, IOException, WriteProcessException { + IoTDBDescriptor.getInstance().getConfig().setDriverTaskExecutionTimeSliceInMs(10000); SeriesReaderTestUtil.setUp( measurementSchemas, deviceIds, seqResources, unSeqResources, DATA_DRIVER_TEST_SG); } @@ -116,6 +118,7 @@ public class DataDriverTest { FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); DataRegion dataRegion = Mockito.mock(DataRegion.class); + Mockito.when(dataRegion.tryReadLock(Mockito.anyLong())).thenReturn(true); FragmentInstanceContext fragmentInstanceContext = createFragmentInstanceContext(instanceId, stateMachine); fragmentInstanceContext.setDataRegion(dataRegion); @@ -174,10 +177,16 @@ public class DataDriverTest { LimitOperator limitOperator = new LimitOperator(driverContext.getOperatorContexts().get(3), 250, timeJoinOperator); + fragmentInstanceContext.setSourcePaths(driverContext.getPaths()); String deviceId = DATA_DRIVER_TEST_SG + ".device0"; Mockito.when( dataRegion.query( - driverContext.getPaths(), deviceId, fragmentInstanceContext, null, null)) + eq(driverContext.getPaths()), + eq(deviceId), + eq(fragmentInstanceContext), + Mockito.isNull(), + Mockito.isNull(), + Mockito.anyLong())) .thenReturn(new QueryDataSource(seqResources, unSeqResources)); fragmentInstanceContext.initQueryDataSource(driverContext.getPaths()); fragmentInstanceContext.initializeNumOfDrivers(1); diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index f7a9c4c3854..da5fac70664 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -660,6 +660,7 @@ struct TFetchFragmentInstanceStatisticsResp { 14: optional i64 blockQueuedTime 15: optional string ip 16: optional string state + 17: optional i32 initDataQuerySourceRetryCount } /**
