This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new bae30752517 [To dev/1.3] Change Query Data Source init way from lock
to tryLock & Add more log and remove useless synchronize
bae30752517 is described below
commit bae307525177e05c8c8b802f743f80248e138ff0
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Aug 26 13:57:23 2025 +0800
[To dev/1.3] Change Query Data Source init way from lock to tryLock & Add
more log and remove useless synchronize
(cherry picked from commit 1b377c049d80a5e83520746c0952a83f5f03bac0)
---
.../queryengine/execution/driver/DataDriver.java | 44 ++-
.../fragment/FakedFragmentInstanceContext.java | 5 +-
.../fragment/FragmentInstanceContext.java | 245 ++++++++----
.../fragment/FragmentInstanceExecution.java | 8 +-
.../queryengine/metric/QueryResourceMetricSet.java | 26 +-
.../FragmentInstanceStatisticsDrawer.java | 11 +
.../db/storageengine/dataregion/DataRegion.java | 411 +++++++++++++++------
.../dataregion/IDataRegionForQuery.java | 29 +-
.../dataregion/VirtualDataRegion.java | 36 +-
.../memtable/AbstractWritableMemChunk.java | 11 +
.../dataregion/memtable/TsFileProcessor.java | 63 +++-
.../dataregion/tsfile/TsFileManager.java | 41 ++
.../db/queryengine/execution/DataDriverTest.java | 11 +-
.../src/main/thrift/datanode.thrift | 1 +
14 files changed, 693 insertions(+), 249 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
index a225793437f..743a05d3777 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
@@ -32,6 +32,7 @@ import javax.annotation.concurrent.NotThreadSafe;
import java.util.List;
import static
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.QUERY_RESOURCE_INIT;
+import static
org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion.UNFINISHED_QUERY_DATA_SOURCE;
/**
* One {@link DataDriver} is responsible for one {@link FragmentInstance}
which is for data query,
@@ -40,7 +41,7 @@ import static
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.QUE
@NotThreadSafe
public class DataDriver extends Driver {
- private boolean init;
+ private boolean init = false;
// Unit : Byte
private final long estimatedMemorySize;
@@ -55,7 +56,13 @@ public class DataDriver extends Driver {
protected boolean init(SettableFuture<?> blockedFuture) {
if (!init) {
try {
- initialize();
+ if (!initialize()) { // failed to init this time, but now exception
thrown, possibly failed
+ // to acquire lock within the specific time
+ blockedFuture.set(null);
+ return false;
+ } else {
+ return true;
+ }
} catch (Throwable t) {
LOGGER.error(
"Failed to do the initialization for driver {} ",
driverContext.getDriverTaskID(), t);
@@ -78,8 +85,7 @@ public class DataDriver extends Driver {
* org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource} is
null after
* initialization, IllegalStateException will be thrown
*/
- private void initialize() throws QueryProcessException {
- long startTime = System.nanoTime();
+ private boolean initialize() throws QueryProcessException {
try {
List<DataSourceOperator> sourceOperators =
((DataDriverContext) driverContext).getSourceOperators();
@@ -90,20 +96,30 @@ public class DataDriver extends Driver {
// for some reasons, we may get null QueryDataSource here.
// And it's safe for us to throw this exception here in such case.
throw new IllegalStateException("QueryDataSource should never be
null!");
+ } else if (dataSource == UNFINISHED_QUERY_DATA_SOURCE) {
+ // init query data source timeout. Maybe failed to acquire the read
lock within the
+ // specified time
+ // do nothing, wait for next try
+ } else {
+ sourceOperators.forEach(
+ sourceOperator -> {
+ // Construct QueryDataSource for source operator
+ sourceOperator.initQueryDataSource(dataSource.clone());
+ });
+ this.init = true;
}
- sourceOperators.forEach(
- sourceOperator -> {
- // Construct QueryDataSource for source operator
- sourceOperator.initQueryDataSource(dataSource.clone());
- });
+ } else {
+ this.init = true;
}
-
- this.init = true;
} finally {
- ((DataDriverContext) driverContext).clearSourceOperators();
- QUERY_EXECUTION_METRICS.recordExecutionCost(
- QUERY_RESOURCE_INIT, System.nanoTime() - startTime);
+ if (this.init) {
+ ((DataDriverContext) driverContext).clearSourceOperators();
+ QUERY_EXECUTION_METRICS.recordExecutionCost(
+ QUERY_RESOURCE_INIT,
+
driverContext.getFragmentInstanceContext().getInitQueryDataSourceCost());
+ }
}
+ return this.init;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FakedFragmentInstanceContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FakedFragmentInstanceContext.java
index a91e41d2379..a76ca60fd5c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FakedFragmentInstanceContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FakedFragmentInstanceContext.java
@@ -48,7 +48,7 @@ public class FakedFragmentInstanceContext extends
FragmentInstanceContext {
public void initQueryDataSource(PartialPath sourcePath) throws
QueryProcessException {
- dataRegion.readLock();
+ dataRegion.tryReadLock(Long.MAX_VALUE);
try {
this.sharedQueryDataSource =
dataRegion.query(
@@ -56,7 +56,8 @@ public class FakedFragmentInstanceContext extends
FragmentInstanceContext {
sourcePath.getDevice(),
this,
getGlobalTimeFilter(),
- null);
+ null,
+ Long.MAX_VALUE);
// used files should be added before mergeLock is unlocked, or they may
be deleted by
// running merge
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 5510394396d..cdb44db5f66 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
+import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
import
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
import
org.apache.iotdb.db.queryengine.plan.planner.memory.ThreadSafeMemoryReservationManager;
@@ -73,11 +74,12 @@ import java.util.stream.Collectors;
import static
org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.BLOCK_QUEUED_TIME;
import static
org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.READY_QUEUED_TIME;
import static
org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion.EMPTY_QUERY_DATA_SOURCE;
+import static
org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion.UNFINISHED_QUERY_DATA_SOURCE;
public class FragmentInstanceContext extends QueryContext {
private static final Logger LOGGER =
LoggerFactory.getLogger(FragmentInstanceContext.class);
- private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
+ private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
private static final long END_TIME_INITIAL_VALUE = -1L;
// wait over 5s for driver to close is abnormal
private static final long LONG_WAIT_DURATION = 5_000_000_000L;
@@ -139,6 +141,7 @@ public class FragmentInstanceContext extends QueryContext {
private TFetchFragmentInstanceStatisticsResp fragmentInstanceStatistics =
null;
private long initQueryDataSourceCost = 0;
+ private int initQueryDataSourceRetryCount = 0;
private final AtomicLong readyQueueTime = new AtomicLong(0);
private final AtomicLong blockQueueTime = new AtomicLong(0);
private long unclosedSeqFileNum = 0;
@@ -353,7 +356,7 @@ public class FragmentInstanceContext extends QueryContext {
PatternTreeMap<Modification,
PatternTreeMapFactory.ModsSerializer> allMods =
loadAllModificationsFromDisk(resource);
atomicReference.set(allMods);
- if (cachedModEntriesSize.get() >=
config.getModsCacheSizeLimitPerFI()) {
+ if (cachedModEntriesSize.get() >=
CONFIG.getModsCacheSizeLimitPerFI()) {
return null;
}
long memCost =
@@ -361,7 +364,7 @@ public class FragmentInstanceContext extends QueryContext {
+
RamUsageEstimator.SHALLOW_SIZE_OF_CONCURRENT_HASHMAP_ENTRY;
long alreadyUsedMemoryForCachedModEntries =
cachedModEntriesSize.get();
while (alreadyUsedMemoryForCachedModEntries + memCost
- < config.getModsCacheSizeLimitPerFI()) {
+ < CONFIG.getModsCacheSizeLimitPerFI()) {
if (cachedModEntriesSize.compareAndSet(
alreadyUsedMemoryForCachedModEntries,
alreadyUsedMemoryForCachedModEntries + memCost)) {
@@ -476,10 +479,6 @@ public class FragmentInstanceContext extends QueryContext {
return dataNodeQueryContext;
}
- public void setDataNodeQueryContext(DataNodeQueryContext
dataNodeQueryContext) {
- this.dataNodeQueryContext = dataNodeQueryContext;
- }
-
public FragmentInstanceInfo getInstanceInfo() {
return getErrorCode()
.map(
@@ -535,11 +534,11 @@ public class FragmentInstanceContext extends QueryContext
{
memoryReservationManager.releaseAllReservedMemory();
}
- public void initQueryDataSource(List<PartialPath> sourcePaths) throws
QueryProcessException {
+ public boolean initQueryDataSource(List<PartialPath> sourcePaths) throws
QueryProcessException {
long startTime = System.nanoTime();
if (sourcePaths == null || sourcePaths.isEmpty()) {
this.sharedQueryDataSource = EMPTY_QUERY_DATA_SOURCE;
- return;
+ return true;
}
String singleDevice = null;
if (sourcePaths.size() == 1) {
@@ -560,82 +559,136 @@ public class FragmentInstanceContext extends
QueryContext {
}
}
}
- dataRegion.readLock();
- try {
- this.sharedQueryDataSource =
- dataRegion.query(
- sourcePaths,
- // when all the selected series are under the same device, the
QueryDataSource will be
- // filtered according to timeIndex
- singleDevice,
- this,
- // time filter may be stateful, so we need to copy it
- globalTimeFilter != null ? globalTimeFilter.copy() : null,
- timePartitions);
-
- // used files should be added before mergeLock is unlocked, or they may
be deleted by
- // running merge
- if (sharedQueryDataSource != null) {
- closedFilePaths = new HashSet<>();
- unClosedFilePaths = new HashSet<>();
- addUsedFilesForQuery((QueryDataSource) sharedQueryDataSource);
- ((QueryDataSource) sharedQueryDataSource).setSingleDevice(singleDevice
!= null);
+
+ long waitForLockTime = CONFIG.getDriverTaskExecutionTimeSliceInMs();
+ long startAcquireLockTime = System.nanoTime();
+ if (dataRegion.tryReadLock(waitForLockTime)) {
+ try {
+ // minus already consumed time
+ waitForLockTime -= (System.nanoTime() - startAcquireLockTime) /
1_000_000;
+
+ // no remaining time slice
+ if (waitForLockTime <= 0) {
+ return false;
+ }
+
+ this.sharedQueryDataSource =
+ dataRegion.query(
+ sourcePaths,
+ // when all the selected series are under the same device, the
QueryDataSource will
+ // be
+ // filtered according to timeIndex
+ singleDevice,
+ this,
+ // time filter may be stateful, so we need to copy it
+ globalTimeFilter != null ? globalTimeFilter.copy() : null,
+ timePartitions,
+ waitForLockTime);
+
+ // used files should be added before mergeLock is unlocked, or they
may be deleted by
+ // running merge
+ if (sharedQueryDataSource != null) {
+ closedFilePaths = new HashSet<>();
+ unClosedFilePaths = new HashSet<>();
+ addUsedFilesForQuery((QueryDataSource) sharedQueryDataSource);
+ ((QueryDataSource)
sharedQueryDataSource).setSingleDevice(singleDevice != null);
+ return true;
+ } else {
+ // failed to acquire lock within the specific time
+ return false;
+ }
+ } finally {
+ addInitQueryDataSourceCost(System.nanoTime() - startTime);
+ dataRegion.readUnlock();
}
- } finally {
- setInitQueryDataSourceCost(System.nanoTime() - startTime);
- dataRegion.readUnlock();
+ } else {
+ addInitQueryDataSourceCost(System.nanoTime() - startTime);
+ return false;
}
}
- public void initRegionScanQueryDataSource(Map<IDeviceID, DeviceContext>
devicePathsToContext)
- throws QueryProcessException {
+ public boolean initRegionScanQueryDataSource(Map<IDeviceID, DeviceContext>
devicePathsToContext) {
long startTime = System.nanoTime();
if (devicePathsToContext == null) {
- return;
+ return true;
}
- dataRegion.readLock();
- try {
- this.sharedQueryDataSource =
- dataRegion.queryForDeviceRegionScan(
- devicePathsToContext,
- this,
- globalTimeFilter != null ? globalTimeFilter.copy() : null,
- timePartitions);
-
- if (sharedQueryDataSource != null) {
- closedFilePaths = new HashSet<>();
- unClosedFilePaths = new HashSet<>();
- addUsedFilesForRegionQuery((QueryDataSourceForRegionScan)
sharedQueryDataSource);
+
+ long waitForLockTime = CONFIG.getDriverTaskExecutionTimeSliceInMs();
+ if (dataRegion.tryReadLock(waitForLockTime)) {
+ try {
+ // minus already consumed time
+ waitForLockTime -= (System.nanoTime() - startTime) / 1_000_000;
+
+ // no remaining time slice
+ if (waitForLockTime <= 0) {
+ return false;
+ }
+ this.sharedQueryDataSource =
+ dataRegion.queryForDeviceRegionScan(
+ devicePathsToContext,
+ this,
+ globalTimeFilter != null ? globalTimeFilter.copy() : null,
+ timePartitions,
+ waitForLockTime);
+
+ if (sharedQueryDataSource != null) {
+ closedFilePaths = new HashSet<>();
+ unClosedFilePaths = new HashSet<>();
+ addUsedFilesForRegionQuery((QueryDataSourceForRegionScan)
sharedQueryDataSource);
+ return true;
+ } else {
+ // failed to acquire lock within the specific time
+ return false;
+ }
+ } finally {
+ addInitQueryDataSourceCost(System.nanoTime() - startTime);
+ dataRegion.readUnlock();
}
- } finally {
- setInitQueryDataSourceCost(System.nanoTime() - startTime);
- dataRegion.readUnlock();
+ } else {
+ addInitQueryDataSourceCost(System.nanoTime() - startTime);
+ return false;
}
}
- public void initRegionScanQueryDataSource(List<PartialPath> pathList)
- throws QueryProcessException {
+ public boolean initRegionScanQueryDataSource(List<PartialPath> pathList) {
long startTime = System.nanoTime();
if (pathList == null) {
- return;
+ return true;
}
- dataRegion.readLock();
- try {
- this.sharedQueryDataSource =
- dataRegion.queryForSeriesRegionScan(
- pathList,
- this,
- globalTimeFilter != null ? globalTimeFilter.copy() : null,
- timePartitions);
-
- if (sharedQueryDataSource != null) {
- closedFilePaths = new HashSet<>();
- unClosedFilePaths = new HashSet<>();
- addUsedFilesForRegionQuery((QueryDataSourceForRegionScan)
sharedQueryDataSource);
+ long waitForLockTime = CONFIG.getDriverTaskExecutionTimeSliceInMs();
+ if (dataRegion.tryReadLock(waitForLockTime)) {
+ // minus already consumed time
+ waitForLockTime -= (System.nanoTime() - startTime) / 1_000_000;
+
+ // no remaining time slice
+ if (waitForLockTime <= 0) {
+ return false;
+ }
+ try {
+ this.sharedQueryDataSource =
+ dataRegion.queryForSeriesRegionScan(
+ pathList,
+ this,
+ globalTimeFilter != null ? globalTimeFilter.copy() : null,
+ timePartitions,
+ waitForLockTime);
+
+ if (sharedQueryDataSource != null) {
+ closedFilePaths = new HashSet<>();
+ unClosedFilePaths = new HashSet<>();
+ addUsedFilesForRegionQuery((QueryDataSourceForRegionScan)
sharedQueryDataSource);
+ return true;
+ } else {
+ // failed to acquire lock within the specific time
+ return false;
+ }
+ } finally {
+ addInitQueryDataSourceCost(System.nanoTime() - startTime);
+ dataRegion.readUnlock();
}
- } finally {
- setInitQueryDataSourceCost(System.nanoTime() - startTime);
- dataRegion.readUnlock();
+ } else {
+ addInitQueryDataSourceCost(System.nanoTime() - startTime);
+ return false;
}
}
@@ -643,17 +696,26 @@ public class FragmentInstanceContext extends QueryContext
{
if (sharedQueryDataSource == null) {
switch (queryDataSourceType) {
case SERIES_SCAN:
- initQueryDataSource(sourcePaths);
- // Friendly for gc
- sourcePaths = null;
+ if (initQueryDataSource(sourcePaths)) {
+ // Friendly for gc
+ sourcePaths = null;
+ } else {
+ return getUnfinishedQueryDataSource();
+ }
break;
case DEVICE_REGION_SCAN:
- initRegionScanQueryDataSource(devicePathsToContext);
- devicePathsToContext = null;
+ if (initRegionScanQueryDataSource(devicePathsToContext)) {
+ devicePathsToContext = null;
+ } else {
+ return getUnfinishedQueryDataSource();
+ }
break;
case TIME_SERIES_REGION_SCAN:
- initRegionScanQueryDataSource(sourcePaths);
- sourcePaths = null;
+ if (initRegionScanQueryDataSource(sourcePaths)) {
+ sourcePaths = null;
+ } else {
+ return getUnfinishedQueryDataSource();
+ }
break;
default:
throw new QueryProcessException(
@@ -663,6 +725,18 @@ public class FragmentInstanceContext extends QueryContext {
return sharedQueryDataSource;
}
+ private IQueryDataSource getUnfinishedQueryDataSource() {
+ increaseInitQueryDataSourceRetryCount();
+ // record warn log every 10 times retry
+ if (initQueryDataSourceRetryCount % 10 == 0) {
+ LOGGER.warn(
+ "Failed to acquire the read lock of DataRegion-{} for {} times",
+ dataRegion == null ? "UNKNOWN" : dataRegion.getDataRegionId(),
+ initQueryDataSourceRetryCount);
+ }
+ return UNFINISHED_QUERY_DATA_SOURCE;
+ }
+
/** Lock and check if tsFileResource is deleted */
private boolean processTsFileResource(TsFileResource tsFileResource, boolean
isClosed) {
addFilePathToMap(tsFileResource, isClosed);
@@ -840,6 +914,9 @@ public class FragmentInstanceContext extends QueryContext {
QueryRelatedResourceMetricSet.getInstance().updateFragmentInstanceTime(durationTime);
+ QueryResourceMetricSet.getInstance()
+ .recordInitQueryResourceRetryCount(getInitQueryDataSourceRetryCount());
+
SeriesScanCostMetricSet.getInstance()
.recordBloomFilterMetrics(
getQueryStatistics().getLoadBloomFilterFromCacheCount().get(),
@@ -921,7 +998,7 @@ public class FragmentInstanceContext extends QueryContext {
.updatePageReaderMemoryUsage(getQueryStatistics().getPageReaderMaxUsedMemorySize().get());
}
- private synchronized void releaseDataNodeQueryContext() {
+ private void releaseDataNodeQueryContext() {
if (dataNodeQueryContextMap == null) {
// this process is in fetch schema, nothing need to release
return;
@@ -958,14 +1035,22 @@ public class FragmentInstanceContext extends
QueryContext {
return fragmentInstanceStatistics;
}
- public void setInitQueryDataSourceCost(long initQueryDataSourceCost) {
- this.initQueryDataSourceCost = initQueryDataSourceCost;
+ public void addInitQueryDataSourceCost(long initQueryDataSourceCost) {
+ this.initQueryDataSourceCost += initQueryDataSourceCost;
}
public long getInitQueryDataSourceCost() {
return initQueryDataSourceCost;
}
+ public void increaseInitQueryDataSourceRetryCount() {
+ this.initQueryDataSourceRetryCount++;
+ }
+
+ public int getInitQueryDataSourceRetryCount() {
+ return initQueryDataSourceRetryCount;
+ }
+
public void addReadyQueuedTime(long time) {
readyQueueTime.addAndGet(time);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
index 9a825a055c4..d8ce02aa3fc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.execution.fragment;
import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.exception.CpuNotEnoughException;
@@ -29,7 +30,6 @@ import
org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler;
-import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery;
import org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion;
import org.apache.iotdb.db.utils.SetThreadName;
@@ -53,6 +53,7 @@ import static
org.apache.iotdb.db.queryengine.statistics.StatisticsMergeUtil.mer
public class FragmentInstanceExecution {
private static final Logger LOGGER =
LoggerFactory.getLogger(FragmentInstanceExecution.class);
+ private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
private final FragmentInstanceId instanceId;
private final FragmentInstanceContext context;
@@ -167,8 +168,8 @@ public class FragmentInstanceExecution {
// We don't need to output the region having ExplainAnalyzeOperator only.
return false;
}
- statistics.setDataRegion(((DataRegion)
context.getDataRegion()).getDataRegionId());
-
statistics.setIp(IoTDBDescriptor.getInstance().getConfig().getAddressAndPort().ip);
+ statistics.setDataRegion(context.getDataRegion().getDataRegionId());
+ statistics.setIp(CONFIG.getInternalAddress() + ":" +
CONFIG.getInternalPort());
statistics.setStartTimeInMS(context.getStartTime());
statistics.setEndTimeInMS(
context.isEndTimeUpdate() ? context.getEndTime() :
System.currentTimeMillis());
@@ -177,6 +178,7 @@ public class FragmentInstanceExecution {
statistics.setReadyQueuedTime(context.getReadyQueueTime());
statistics.setInitDataQuerySourceCost(context.getInitQueryDataSourceCost());
+
statistics.setInitDataQuerySourceRetryCount(context.getInitQueryDataSourceRetryCount());
statistics.setSeqClosednNum(context.getClosedSeqFileNum());
statistics.setSeqUnclosedNum(context.getUnclosedSeqFileNum());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/QueryResourceMetricSet.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/QueryResourceMetricSet.java
index 401fc2e0c8f..241486750a4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/QueryResourceMetricSet.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/QueryResourceMetricSet.java
@@ -41,10 +41,12 @@ public class QueryResourceMetricSet implements IMetricSet {
public static final String UNSEQUENCE_TSFILE = "unsequence_tsfile";
public static final String FLUSHING_MEMTABLE = "flushing_memtable";
public static final String WORKING_MEMTABLE = "working_memtable";
+ public static final String INIT_QUERY_RESOURCE_RETRY_COUNT = "retry_count";
private Histogram sequenceTsFileHistogram =
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
private Histogram unsequenceTsFileHistogram =
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
private Histogram flushingMemTableHistogram =
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
private Histogram workingMemTableHistogram =
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+ private Histogram retryCountHistogram =
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
public void recordQueryResourceNum(String type, int count) {
switch (type) {
@@ -60,11 +62,22 @@ public class QueryResourceMetricSet implements IMetricSet {
case WORKING_MEMTABLE:
workingMemTableHistogram.update(count);
break;
+ case INIT_QUERY_RESOURCE_RETRY_COUNT:
+ if (count > 0) {
+ retryCountHistogram.update(count);
+ }
+ break;
default:
break;
}
}
+ public void recordInitQueryResourceRetryCount(int count) {
+ if (count > 0) {
+ retryCountHistogram.update(count);
+ }
+ }
+
@Override
public void bindTo(AbstractMetricService metricService) {
sequenceTsFileHistogram =
@@ -91,11 +104,22 @@ public class QueryResourceMetricSet implements IMetricSet {
MetricLevel.IMPORTANT,
Tag.TYPE.toString(),
WORKING_MEMTABLE);
+ retryCountHistogram =
+ metricService.getOrCreateHistogram(
+ Metric.QUERY_RESOURCE.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.TYPE.toString(),
+ INIT_QUERY_RESOURCE_RETRY_COUNT);
}
@Override
public void unbindFrom(AbstractMetricService metricService) {
- Arrays.asList(SEQUENCE_TSFILE, UNSEQUENCE_TSFILE, FLUSHING_MEMTABLE,
WORKING_MEMTABLE)
+ Arrays.asList(
+ SEQUENCE_TSFILE,
+ UNSEQUENCE_TSFILE,
+ FLUSHING_MEMTABLE,
+ WORKING_MEMTABLE,
+ INIT_QUERY_RESOURCE_RETRY_COUNT)
.forEach(
type ->
metricService.remove(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FragmentInstanceStatisticsDrawer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FragmentInstanceStatisticsDrawer.java
index 0141a86e627..b9f1a55b5bd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FragmentInstanceStatisticsDrawer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FragmentInstanceStatisticsDrawer.java
@@ -113,6 +113,17 @@ public class FragmentInstanceStatisticsDrawer {
String.format(
"Cost of initDataQuerySource: %.3f ms",
statistics.getInitDataQuerySourceCost() * NS_TO_MS_FACTOR));
+
+ if (statistics.isSetInitDataQuerySourceRetryCount()
+ && statistics.getInitDataQuerySourceRetryCount() > 0) {
+ addLine(
+ singleFragmentInstanceArea,
+ 1,
+ String.format(
+ "Retry count of initDataQuerySource: %d",
+ statistics.getInitDataQuerySourceRetryCount()));
+ }
+
addLine(
singleFragmentInstanceArea,
1,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index a04c81f4801..b68a7199436 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -234,11 +234,6 @@ public class DataRegion implements IDataRegionForQuery {
/** closeStorageGroupCondition is used to wait for all currently closing
TsFiles to be done. */
private final Object closeStorageGroupCondition = new Object();
- /**
- * Avoid some tsfileResource is changed (e.g., from unsealed to sealed) when
a read is executed.
- */
- private final ReadWriteLock closeQueryLock = new ReentrantReadWriteLock();
-
/** time partition id in the database -> {@link TsFileProcessor} for this
time partition. */
private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors =
new TreeMap<>();
@@ -1956,8 +1951,8 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /** used for query engine */
@Override
+ @TestOnly
public QueryDataSource query(
List<PartialPath> pathList,
String singleDeviceId,
@@ -1965,31 +1960,154 @@ public class DataRegion implements IDataRegionForQuery
{
Filter globalTimeFilter,
List<Long> timePartitions)
throws QueryProcessException {
- try {
- List<TsFileResource> seqResources =
- getFileResourceListForQuery(
- tsFileManager.getTsFileList(true, timePartitions,
globalTimeFilter),
- pathList,
- singleDeviceId,
- context,
+ return query(
+ pathList, singleDeviceId, context, globalTimeFilter, timePartitions,
Long.MAX_VALUE);
+ }
+
+ /** used for query engine */
+ @Override
+ public QueryDataSource query(
+ List<PartialPath> pathList,
+ String singleDeviceId,
+ QueryContext context,
+ Filter globalTimeFilter,
+ List<Long> timePartitions,
+ long waitForLockTimeInMs)
+ throws QueryProcessException {
+
+ Pair<List<TsFileResource>, List<TsFileResource>> pair =
+ tsFileManager.getAllTsFileListForQuery(timePartitions,
globalTimeFilter);
+
+ List<TsFileResource> seqTsFileResouceList = pair.left;
+ List<TsFileResource> unSeqTsFileResouceList = pair.right;
+
+ List<TsFileProcessor> needToUnLockList = new ArrayList<>();
+
+ boolean success =
+ tryGetFLushLock(
+ waitForLockTimeInMs,
+ singleDeviceId,
+ globalTimeFilter,
+ context.isDebug(),
+ seqTsFileResouceList,
+ unSeqTsFileResouceList,
+ needToUnLockList);
+
+ if (success) {
+ try {
+ List<TsFileResource> satisfiedSeqResourceList =
+ getFileResourceListForQuery(
+ seqTsFileResouceList, pathList, singleDeviceId, context,
globalTimeFilter, true);
+
+ List<TsFileResource> satisfiedUnSeqResourceList =
+ getFileResourceListForQuery(
+ unSeqTsFileResouceList, pathList, singleDeviceId, context,
globalTimeFilter, false);
+
+ QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(
+ SEQUENCE_TSFILE, satisfiedSeqResourceList.size());
+ QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(
+ UNSEQUENCE_TSFILE, satisfiedUnSeqResourceList.size());
+ return new QueryDataSource(satisfiedSeqResourceList,
satisfiedUnSeqResourceList);
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ } finally {
+ clearAlreadyLockedList(needToUnLockList);
+ }
+ } else {
+ // means that failed to acquire lock within the specific time
+ return null;
+ }
+ }
+
+ /**
+ * try to get flush lock for each unclosed satisfied tsfile
+ *
+ * @return true if lock successfully, otherwise false if return false,
needToUnLockList will
+ * always be empty because this method is responsible for unlocking all
the already-acquiring
+ * lock if return true, the caller is responsible for unlocking all the
already-acquiring lock
+ * in needToUnLockList
+ */
+ private boolean tryGetFLushLock(
+ long waitTimeInMs,
+ String singleDeviceId,
+ Filter globalTimeFilter,
+ boolean isDebug,
+ List<TsFileResource> seqResources,
+ List<TsFileResource> unSeqResources,
+ List<TsFileProcessor> needToUnLockList) {
+ // deal with seq resources
+ for (TsFileResource tsFileResource : seqResources) {
+ // only need to acquire flush lock for those unclosed and satisfied
tsfile
+ if (!tsFileResource.isClosed()
+ && tsFileResource.isSatisfied(
+ singleDeviceId == null ? null : new
PlainDeviceID(singleDeviceId),
globalTimeFilter,
- true);
- List<TsFileResource> unseqResources =
- getFileResourceListForQuery(
- tsFileManager.getTsFileList(false, timePartitions,
globalTimeFilter),
- pathList,
- singleDeviceId,
- context,
+ true,
+ isDebug)) {
+ TsFileProcessor tsFileProcessor = tsFileResource.getProcessor();
+ try {
+ long startTime = System.nanoTime();
+ if (tsFileProcessor.tryReadLock(waitTimeInMs)) {
+ // minus already consumed time
+ waitTimeInMs -= (System.nanoTime() - startTime) / 1_000_000;
+
+ needToUnLockList.add(tsFileProcessor);
+ // no remaining time slice
+ if (waitTimeInMs <= 0) {
+ clearAlreadyLockedList(needToUnLockList);
+ return false;
+ }
+ } else {
+ clearAlreadyLockedList(needToUnLockList);
+ return false;
+ }
+ } catch (InterruptedException e) {
+ clearAlreadyLockedList(needToUnLockList);
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+ }
+ // deal with unSeq resources
+ for (TsFileResource tsFileResource : unSeqResources) {
+ if (!tsFileResource.isClosed()
+ && tsFileResource.isSatisfied(
+ singleDeviceId == null ? null : new
PlainDeviceID(singleDeviceId),
globalTimeFilter,
- false);
-
- QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(SEQUENCE_TSFILE,
seqResources.size());
- QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(UNSEQUENCE_TSFILE,
unseqResources.size());
+ false,
+ isDebug)) {
+ TsFileProcessor tsFileProcessor = tsFileResource.getProcessor();
+ try {
+ long startTime = System.nanoTime();
+ if (tsFileProcessor.tryReadLock(waitTimeInMs)) {
+ // minus already consumed time
+ waitTimeInMs -= (System.nanoTime() - startTime) / 1_000_000;
+
+ needToUnLockList.add(tsFileProcessor);
+ // no remaining time slice
+ if (waitTimeInMs <= 0) {
+ clearAlreadyLockedList(needToUnLockList);
+ return false;
+ }
+ } else {
+ clearAlreadyLockedList(needToUnLockList);
+ return false;
+ }
+ } catch (InterruptedException e) {
+ clearAlreadyLockedList(needToUnLockList);
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+ }
+ return true;
+ }
- return new QueryDataSource(seqResources, unseqResources);
- } catch (MetadataException e) {
- throw new QueryProcessException(e);
+ private void clearAlreadyLockedList(List<TsFileProcessor> needToUnLockList) {
+ for (TsFileProcessor processor : needToUnLockList) {
+ processor.readUnLock();
}
+ needToUnLockList.clear();
}
@Override
@@ -1997,31 +2115,51 @@ public class DataRegion implements IDataRegionForQuery {
List<PartialPath> pathList,
QueryContext queryContext,
Filter globalTimeFilter,
- List<Long> timePartitions)
- throws QueryProcessException {
- try {
- List<IFileScanHandle> seqFileScanHandles =
- getFileHandleListForQuery(
- tsFileManager.getTsFileList(true, timePartitions,
globalTimeFilter),
- pathList,
- queryContext,
- globalTimeFilter,
- true);
- List<IFileScanHandle> unseqFileScanHandles =
- getFileHandleListForQuery(
- tsFileManager.getTsFileList(false, timePartitions,
globalTimeFilter),
- pathList,
- queryContext,
- globalTimeFilter,
- false);
-
- QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(SEQUENCE_TSFILE,
seqFileScanHandles.size());
- QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(
- UNSEQUENCE_TSFILE, unseqFileScanHandles.size());
+ List<Long> timePartitions,
+ long waitForLockTimeInMs) {
+ Pair<List<TsFileResource>, List<TsFileResource>> pair =
+ tsFileManager.getAllTsFileListForQuery(timePartitions,
globalTimeFilter);
+
+ List<TsFileResource> seqTsFileResouceList = pair.left;
+ List<TsFileResource> unSeqTsFileResouceList = pair.right;
+
+ List<TsFileProcessor> needToUnLockList = new ArrayList<>();
+
+ boolean success =
+ tryGetFLushLock(
+ waitForLockTimeInMs,
+ null,
+ globalTimeFilter,
+ queryContext.isDebug(),
+ seqTsFileResouceList,
+ unSeqTsFileResouceList,
+ needToUnLockList);
+
+ if (success) {
+ try {
+ List<IFileScanHandle> seqFileScanHandles =
+ getFileHandleListForQuery(
+ seqTsFileResouceList, pathList, queryContext,
globalTimeFilter, true);
+
+ List<IFileScanHandle> unSeqFileScanHandles =
+ getFileHandleListForQuery(
+ tsFileManager.getTsFileList(false, timePartitions,
globalTimeFilter),
+ pathList,
+ queryContext,
+ globalTimeFilter,
+ false);
- return new QueryDataSourceForRegionScan(seqFileScanHandles,
unseqFileScanHandles);
- } catch (MetadataException e) {
- throw new QueryProcessException(e);
+ QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(
+ SEQUENCE_TSFILE, seqFileScanHandles.size());
+ QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(
+ UNSEQUENCE_TSFILE, unSeqFileScanHandles.size());
+ return new QueryDataSourceForRegionScan(seqFileScanHandles,
unSeqFileScanHandles);
+ } finally {
+ clearAlreadyLockedList(needToUnLockList);
+ }
+ } else {
+ // means that failed to acquire lock within the specific time
+ return null;
}
}
@@ -2030,25 +2168,19 @@ public class DataRegion implements IDataRegionForQuery {
List<PartialPath> partialPaths,
QueryContext context,
Filter globalTimeFilter,
- boolean isSeq)
- throws MetadataException {
+ boolean isSeq) {
List<IFileScanHandle> fileScanHandles = new ArrayList<>();
for (TsFileResource tsFileResource : tsFileResources) {
if (!tsFileResource.isSatisfied(null, globalTimeFilter, isSeq,
context.isDebug())) {
continue;
}
- closeQueryLock.readLock().lock();
- try {
- if (tsFileResource.isClosed()) {
- fileScanHandles.add(new ClosedFileScanHandleImpl(tsFileResource,
context));
- } else {
- tsFileResource
- .getProcessor()
- .queryForSeriesRegionScan(partialPaths, context,
fileScanHandles);
- }
- } finally {
- closeQueryLock.readLock().unlock();
+ if (tsFileResource.isClosed()) {
+ fileScanHandles.add(new ClosedFileScanHandleImpl(tsFileResource,
context));
+ } else {
+ tsFileResource
+ .getProcessor()
+ .queryForSeriesRegionScanWithoutLock(partialPaths, context,
fileScanHandles);
}
}
return fileScanHandles;
@@ -2059,31 +2191,52 @@ public class DataRegion implements IDataRegionForQuery {
Map<IDeviceID, DeviceContext> devicePathToAligned,
QueryContext queryContext,
Filter globalTimeFilter,
- List<Long> timePartitions)
- throws QueryProcessException {
- try {
- List<IFileScanHandle> seqFileScanHandles =
- getFileHandleListForQuery(
- tsFileManager.getTsFileList(true, timePartitions,
globalTimeFilter),
- devicePathToAligned,
- queryContext,
- globalTimeFilter,
- true);
- List<IFileScanHandle> unseqFileScanHandles =
- getFileHandleListForQuery(
- tsFileManager.getTsFileList(false, timePartitions,
globalTimeFilter),
- devicePathToAligned,
- queryContext,
- globalTimeFilter,
- false);
+ List<Long> timePartitions,
+ long waitForLockTimeInMs) {
+
+ Pair<List<TsFileResource>, List<TsFileResource>> pair =
+ tsFileManager.getAllTsFileListForQuery(timePartitions,
globalTimeFilter);
+
+ List<TsFileResource> seqTsFileResouceList = pair.left;
+ List<TsFileResource> unSeqTsFileResouceList = pair.right;
+
+ List<TsFileProcessor> needToUnLockList = new ArrayList<>();
- QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(SEQUENCE_TSFILE,
seqFileScanHandles.size());
- QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(
- UNSEQUENCE_TSFILE, unseqFileScanHandles.size());
+ boolean success =
+ tryGetFLushLock(
+ waitForLockTimeInMs,
+ null,
+ globalTimeFilter,
+ queryContext.isDebug(),
+ seqTsFileResouceList,
+ unSeqTsFileResouceList,
+ needToUnLockList);
- return new QueryDataSourceForRegionScan(seqFileScanHandles,
unseqFileScanHandles);
- } catch (MetadataException e) {
- throw new QueryProcessException(e);
+ if (success) {
+ try {
+ List<IFileScanHandle> seqFileScanHandles =
+ getFileHandleListForQuery(
+ seqTsFileResouceList, devicePathToAligned, queryContext,
globalTimeFilter, true);
+
+ List<IFileScanHandle> unSeqFileScanHandles =
+ getFileHandleListForQuery(
+ tsFileManager.getTsFileList(false, timePartitions,
globalTimeFilter),
+ devicePathToAligned,
+ queryContext,
+ globalTimeFilter,
+ false);
+
+ QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(
+ SEQUENCE_TSFILE, seqFileScanHandles.size());
+ QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(
+ UNSEQUENCE_TSFILE, unSeqFileScanHandles.size());
+ return new QueryDataSourceForRegionScan(seqFileScanHandles,
unSeqFileScanHandles);
+ } finally {
+ clearAlreadyLockedList(needToUnLockList);
+ }
+ } else {
+ // means that failed to acquire lock within the specific time
+ return null;
}
}
@@ -2092,25 +2245,19 @@ public class DataRegion implements IDataRegionForQuery {
Map<IDeviceID, DeviceContext> devicePathsToContext,
QueryContext context,
Filter globalTimeFilter,
- boolean isSeq)
- throws MetadataException {
+ boolean isSeq) {
List<IFileScanHandle> fileScanHandles = new ArrayList<>();
for (TsFileResource tsFileResource : tsFileResources) {
if (!tsFileResource.isSatisfied(null, globalTimeFilter, isSeq,
context.isDebug())) {
continue;
}
- closeQueryLock.readLock().lock();
- try {
- if (tsFileResource.isClosed()) {
- fileScanHandles.add(new ClosedFileScanHandleImpl(tsFileResource,
context));
- } else {
- tsFileResource
- .getProcessor()
- .queryForDeviceRegionScan(devicePathsToContext, context,
fileScanHandles);
- }
- } finally {
- closeQueryLock.readLock().unlock();
+ if (tsFileResource.isClosed()) {
+ fileScanHandles.add(new ClosedFileScanHandleImpl(tsFileResource,
context));
+ } else {
+ tsFileResource
+ .getProcessor()
+ .queryForDeviceRegionScanWithoutLock(devicePathsToContext,
context, fileScanHandles);
}
}
return fileScanHandles;
@@ -2118,11 +2265,45 @@ public class DataRegion implements IDataRegionForQuery {
/** lock the read lock of the insert lock */
@Override
- public void readLock() {
- // apply read lock for SG insert lock to prevent inconsistent with
concurrently writing memtable
- insertLock.readLock().lock();
+ public boolean tryReadLock(long waitMillis) {
+ try {
+ // apply read lock for SG insert lock to prevent inconsistent with
concurrently writing
+ // memtable
+ long startTime = System.nanoTime();
+ if (insertLock.readLock().tryLock(waitMillis, TimeUnit.MILLISECONDS)) {
+ // minus already consumed time
+ waitMillis -= (System.nanoTime() - startTime) / 1_000_000;
+ // no remaining time slice
+ if (waitMillis <= 0) {
+ insertLock.readLock().unlock();
+ return false;
+ }
+ return tryGetTsFileManagerReadLock(waitMillis);
+ } else {
+ return false;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+
+ private boolean tryGetTsFileManagerReadLock(long waitMillis) {
// apply read lock for TsFileResource list
- tsFileManager.readLock();
+ try {
+ if (tsFileManager.tryReadLock(waitMillis)) {
+ return true;
+ } else {
+ // failed to acquire tsFileManager read lock, we also need to unlock
the insertLock
+ insertLock.readLock().unlock();
+ return false;
+ }
+ } catch (InterruptedException e) {
+ // failed to acquire tsFileManager read lock, we also need to unlock the
insertLock
+ insertLock.readLock().unlock();
+ Thread.currentThread().interrupt();
+ return false;
+ }
}
/** unlock the read lock of insert lock */
@@ -2157,15 +2338,6 @@ public class DataRegion implements IDataRegionForQuery {
boolean isSeq)
throws MetadataException {
- if (context.isDebug()) {
- DEBUG_LOGGER.info(
- "Path: {}, get tsfile list: {} isSeq: {} time filter: {}",
- pathList,
- tsFileResources,
- isSeq,
- (globalTimeFilter == null ? "null" : globalTimeFilter));
- }
-
List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
for (TsFileResource tsFileResource : tsFileResources) {
@@ -2176,19 +2348,16 @@ public class DataRegion implements IDataRegionForQuery {
context.isDebug())) {
continue;
}
- closeQueryLock.readLock().lock();
try {
if (tsFileResource.isClosed()) {
tsfileResourcesForQuery.add(tsFileResource);
} else {
tsFileResource
.getProcessor()
- .query(pathList, context, tsfileResourcesForQuery,
globalTimeFilter);
+ .queryWithoutLock(pathList, context, tsfileResourcesForQuery,
globalTimeFilter);
}
} catch (IOException e) {
throw new MetadataException(e);
- } finally {
- closeQueryLock.readLock().unlock();
}
}
return tsfileResourcesForQuery;
@@ -2660,9 +2829,9 @@ public class DataRegion implements IDataRegionForQuery {
isValidateTsFileFailed =
!TsFileValidator.getInstance().validateTsFile(tsFileProcessor.getTsFileResource());
}
- closeQueryLock.writeLock().lock();
+ tsFileProcessor.writeLock();
try {
- tsFileProcessor.close();
+ tsFileProcessor.closeWithoutSettingResourceStatus();
if (isEmptyFile) {
tsFileProcessor.getTsFileResource().remove();
} else if (isValidateTsFileFailed) {
@@ -2671,10 +2840,11 @@ public class DataRegion implements IDataRegionForQuery {
renameAndHandleError(
tsFilePath + RESOURCE_SUFFIX, tsFilePath + RESOURCE_SUFFIX +
BROKEN_SUFFIX);
} else {
+
tsFileProcessor.getTsFileResource().setStatus(TsFileResourceStatus.NORMAL);
tsFileResourceManager.registerSealedTsFileResource(tsFileProcessor.getTsFileResource());
}
} finally {
- closeQueryLock.writeLock().unlock();
+ tsFileProcessor.writeUnlock();
}
if (isEmptyFile || isValidateTsFileFailed) {
tsFileManager.remove(tsFileProcessor.getTsFileResource(),
tsFileProcessor.isSequence());
@@ -3424,6 +3594,7 @@ public class DataRegion implements IDataRegionForQuery {
return tsFileManager.getTsFileList(false);
}
+ @Override
public String getDataRegionId() {
return dataRegionId;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java
index c18262145e5..14c6db513fb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.common.DeviceContext;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
@@ -34,12 +35,12 @@ import java.util.Map;
/** It's an interface that storage engine must provide for query engine */
public interface IDataRegionForQuery {
- /** lock the read lock for thread-safe */
- void readLock();
+ boolean tryReadLock(long waitMillis);
void readUnlock();
/** Get satisfied QueryDataSource from DataRegion for seriesScan */
+ @TestOnly
QueryDataSource query(
List<PartialPath> pathList,
String singleDeviceId,
@@ -48,21 +49,37 @@ public interface IDataRegionForQuery {
List<Long> timePartitions)
throws QueryProcessException;
+ /**
+ * Get satisfied QueryDataSource from DataRegion for seriesScan
+ *
+ * @return null means that failed to acquire lock within the specific time
+ */
+ QueryDataSource query(
+ List<PartialPath> pathList,
+ String singleDeviceId,
+ QueryContext context,
+ Filter globalTimeFilter,
+ List<Long> timePartitions,
+ long waitForLockTimeInMs)
+ throws QueryProcessException;
+
/** Get satisfied QueryDataSource from DataRegion for regionScan */
IQueryDataSource queryForDeviceRegionScan(
Map<IDeviceID, DeviceContext> devicePathsToContext,
QueryContext queryContext,
Filter globalTimeFilter,
- List<Long> timePartitions)
- throws QueryProcessException;
+ List<Long> timePartitions,
+ long waitForLockTimeInMs);
IQueryDataSource queryForSeriesRegionScan(
List<PartialPath> pathList,
QueryContext queryContext,
Filter globalTimeFilter,
- List<Long> timePartitions)
- throws QueryProcessException;
+ List<Long> timePartitions,
+ long waitForLockTimeInMs);
/** Get database name of this DataRegion */
String getDatabaseName();
+
+ String getDataRegionId();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java
index c71111324c4..9e2f53b6b18 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java
@@ -41,9 +41,14 @@ public class VirtualDataRegion implements
IDataRegionForQuery {
private static final String VIRTUAL_DB_NAME = "root.__virtual";
+ private static final String VIRTUAL_DATA_REGION_ID = "virtual_data_region";
+
public static final QueryDataSource EMPTY_QUERY_DATA_SOURCE =
new QueryDataSource(Collections.emptyList(), Collections.emptyList());
+ public static final QueryDataSource UNFINISHED_QUERY_DATA_SOURCE =
+ new QueryDataSource(Collections.emptyList(), Collections.emptyList());
+
private static final QueryDataSourceForRegionScan
EMPTY_REGION_QUERY_DATA_SOURCE =
new QueryDataSourceForRegionScan(Collections.emptyList(),
Collections.emptyList());
@@ -52,8 +57,9 @@ public class VirtualDataRegion implements IDataRegionForQuery
{
}
@Override
- public void readLock() {
- // do nothing, because itself is thread-safe already
+ public boolean tryReadLock(long waitMillis) {
+ // do nothing, always return true
+ return true;
}
@Override
@@ -69,6 +75,19 @@ public class VirtualDataRegion implements
IDataRegionForQuery {
Filter globalTimeFilter,
List<Long> timePartitions)
throws QueryProcessException {
+ return query(
+ pathList, singleDeviceId, context, globalTimeFilter, timePartitions,
Long.MAX_VALUE);
+ }
+
+ @Override
+ public QueryDataSource query(
+ List<PartialPath> pathList,
+ String singleDeviceId,
+ QueryContext context,
+ Filter globalTimeFilter,
+ List<Long> timePartitions,
+ long waitForLockTimeInMs)
+ throws QueryProcessException {
return EMPTY_QUERY_DATA_SOURCE;
}
@@ -77,8 +96,8 @@ public class VirtualDataRegion implements IDataRegionForQuery
{
Map<IDeviceID, DeviceContext> devicePathsToContext,
QueryContext queryContext,
Filter globalTimeFilter,
- List<Long> timePartitions)
- throws QueryProcessException {
+ List<Long> timePartitions,
+ long waitForLockTimeInMs) {
return EMPTY_REGION_QUERY_DATA_SOURCE;
}
@@ -87,8 +106,8 @@ public class VirtualDataRegion implements
IDataRegionForQuery {
List<PartialPath> pathList,
QueryContext queryContext,
Filter globalTimeFilter,
- List<Long> timePartitions)
- throws QueryProcessException {
+ List<Long> timePartitions,
+ long waitForLockTimeInMs) {
return EMPTY_REGION_QUERY_DATA_SOURCE;
}
@@ -97,6 +116,11 @@ public class VirtualDataRegion implements
IDataRegionForQuery {
return VIRTUAL_DB_NAME;
}
+ @Override
+ public String getDataRegionId() {
+ return VIRTUAL_DATA_REGION_ID;
+ }
+
private static class InstanceHolder {
private InstanceHolder() {}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
index f797613b078..58aabe4a5ac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
@@ -32,12 +32,16 @@ import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.write.chunk.IChunkWriter;
import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
public abstract class AbstractWritableMemChunk implements IWritableMemChunk {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractWritableMemChunk.class);
+
protected static long RETRY_INTERVAL_MS = 100L;
protected static long MAX_WAIT_QUERY_MS = 60 * 1000L;
@@ -53,11 +57,18 @@ public abstract class AbstractWritableMemChunk implements
IWritableMemChunk {
protected void maybeReleaseTvList(TVList tvList) {
long startTimeInMs = System.currentTimeMillis();
boolean succeed = false;
+ int retryCount = 0;
while (!succeed) {
try {
tryReleaseTvList(tvList);
succeed = true;
} catch (MemoryNotEnoughException ex) {
+ // print log every 5 seconds
+ if (retryCount % 50 == 0) {
+ LOGGER.warn(
+ "Failed to transfer tvlist memory owner to query engine, {}",
ex.getMessage());
+ }
+ retryCount++;
long waitQueryInMs = System.currentTimeMillis() - startTimeInMs;
if (waitQueryInMs > MAX_WAIT_QUERY_MS) {
// Abort first query in the list. When all queries in the list have
been aborted,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 34070a848c9..2d58015a596 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -1633,10 +1633,10 @@ public class TsFileProcessor {
}
/** Close this tsfile */
- public void close() throws TsFileProcessorException {
+ public void closeWithoutSettingResourceStatus() throws
TsFileProcessorException {
try {
// When closing resource file, its corresponding mod file is also closed.
- tsFileResource.close();
+ tsFileResource.closeWithoutSettingStatus();
} catch (IOException e) {
throw new TsFileProcessorException(e);
}
@@ -1820,7 +1820,7 @@ public class TsFileProcessor {
return new ArrayList<>(alignedChunkMetadataForOneDevice);
}
- public void queryForSeriesRegionScan(
+ public void queryForSeriesRegionScanWithoutLock(
List<PartialPath> pathList,
QueryContext queryContext,
List<IFileScanHandle> fileScanHandlesForQuery) {
@@ -1829,7 +1829,6 @@ public class TsFileProcessor {
Map<IDeviceID, Map<String, List<IChunkHandle>>>
deviceToMemChunkHandleMap = new HashMap<>();
Map<IDeviceID, Map<String, List<IChunkMetadata>>>
deviceToChunkMetadataListMap =
new HashMap<>();
- flushQueryLock.readLock().lock();
try {
for (PartialPath seriesPath : pathList) {
Map<String, List<IChunkMetadata>> measurementToChunkMetaList = new
HashMap<>();
@@ -1884,9 +1883,6 @@ public class TsFileProcessor {
QUERY_RESOURCE_METRICS.recordQueryResourceNum(FLUSHING_MEMTABLE,
flushingMemTables.size());
QUERY_RESOURCE_METRICS.recordQueryResourceNum(
WORKING_MEMTABLE, workMemTable != null ? 1 : 0);
-
- flushQueryLock.readLock().unlock();
- logFlushQueryReadUnlocked();
}
if (!deviceToMemChunkHandleMap.isEmpty() ||
!deviceToChunkMetadataListMap.isEmpty()) {
fileScanHandlesForQuery.add(
@@ -1903,7 +1899,7 @@ public class TsFileProcessor {
* Construct IFileScanHandle for data in memtable and the other ones in
flushing memtables. Then
* get the related ChunkMetadata of data on disk.
*/
- public void queryForDeviceRegionScan(
+ public void queryForDeviceRegionScanWithoutLock(
Map<IDeviceID, DeviceContext> devicePathsToContext,
QueryContext queryContext,
List<IFileScanHandle> fileScanHandlesForQuery) {
@@ -1912,7 +1908,6 @@ public class TsFileProcessor {
Map<IDeviceID, Map<String, List<IChunkHandle>>>
deviceToMemChunkHandleMap = new HashMap<>();
Map<IDeviceID, Map<String, List<IChunkMetadata>>>
deviceToChunkMetadataListMap =
new HashMap<>();
- flushQueryLock.readLock().lock();
try {
for (Map.Entry<IDeviceID, DeviceContext> entry :
devicePathsToContext.entrySet()) {
IDeviceID deviceID = entry.getKey();
@@ -1968,9 +1963,6 @@ public class TsFileProcessor {
QUERY_RESOURCE_METRICS.recordQueryResourceNum(FLUSHING_MEMTABLE,
flushingMemTables.size());
QUERY_RESOURCE_METRICS.recordQueryResourceNum(
WORKING_MEMTABLE, workMemTable != null ? 1 : 0);
-
- flushQueryLock.readLock().unlock();
- logFlushQueryReadUnlocked();
}
if (!deviceToMemChunkHandleMap.isEmpty() ||
!deviceToChunkMetadataListMap.isEmpty()) {
@@ -1991,18 +1983,40 @@ public class TsFileProcessor {
*
* @param seriesPaths selected paths
*/
+ public void queryWithoutLock(
+ List<PartialPath> seriesPaths,
+ QueryContext context,
+ List<TsFileResource> tsfileResourcesForQuery,
+ Filter globalTimeFilter)
+ throws IOException {
+ query(seriesPaths, context, tsfileResourcesForQuery, globalTimeFilter,
false);
+ }
+
+ @TestOnly
public void query(
List<PartialPath> seriesPaths,
QueryContext context,
List<TsFileResource> tsfileResourcesForQuery,
Filter globalTimeFilter)
throws IOException {
+ query(seriesPaths, context, tsfileResourcesForQuery, globalTimeFilter,
true);
+ }
+
+ private void query(
+ List<PartialPath> seriesPaths,
+ QueryContext context,
+ List<TsFileResource> tsfileResourcesForQuery,
+ Filter globalTimeFilter,
+ boolean needLock)
+ throws IOException {
long startTime = System.nanoTime();
try {
Map<PartialPath, List<IChunkMetadata>> pathToChunkMetadataListMap = new
HashMap<>();
Map<PartialPath, List<ReadOnlyMemChunk>> pathToReadOnlyMemChunkMap = new
HashMap<>();
- flushQueryLock.readLock().lock();
+ if (needLock) {
+ flushQueryLock.readLock().lock();
+ }
try {
for (PartialPath seriesPath : seriesPaths) {
List<ReadOnlyMemChunk> readOnlyMemChunks = new ArrayList<>();
@@ -2047,9 +2061,10 @@ public class TsFileProcessor {
QUERY_RESOURCE_METRICS.recordQueryResourceNum(FLUSHING_MEMTABLE,
flushingMemTables.size());
QUERY_RESOURCE_METRICS.recordQueryResourceNum(
WORKING_MEMTABLE, workMemTable != null ? 1 : 0);
-
- flushQueryLock.readLock().unlock();
- logFlushQueryReadUnlocked();
+ if (needLock) {
+ flushQueryLock.readLock().unlock();
+ logFlushQueryReadUnlocked();
+ }
}
if (!pathToReadOnlyMemChunkMap.isEmpty() ||
!pathToChunkMetadataListMap.isEmpty()) {
@@ -2159,6 +2174,22 @@ public class TsFileProcessor {
return flushingMemTables;
}
+ public void writeLock() {
+ flushQueryLock.writeLock().lock();
+ }
+
+ public void writeUnlock() {
+ flushQueryLock.writeLock().unlock();
+ }
+
+ public boolean tryReadLock(long waitInMs) throws InterruptedException {
+ return flushQueryLock.readLock().tryLock(waitInMs, TimeUnit.MILLISECONDS);
+ }
+
+ public void readUnLock() {
+ flushQueryLock.readLock().unlock();
+ }
+
private void logFlushQueryWriteLocked() {
if (logger.isDebugEnabled()) {
logger.debug(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
index fb7d3303576..1c831534991 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeInd
import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
import org.apache.tsfile.read.filter.basic.Filter;
+import org.apache.tsfile.utils.Pair;
import java.io.IOException;
import java.util.ArrayList;
@@ -34,6 +35,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -96,6 +98,41 @@ public class TsFileManager {
}
}
+ /**
+ * don't need to acquire lock again, caller should guarantee the lock has
been acquired
+ *
+ * @return left is seq resource list, right is unSeq resource list
+ */
+ public Pair<List<TsFileResource>, List<TsFileResource>>
getAllTsFileListForQuery(
+ List<Long> timePartitions, Filter timeFilter) {
+ List<TsFileResource> seq = new ArrayList<>();
+ List<TsFileResource> unSeq = new ArrayList<>();
+ if (timePartitions == null) {
+ for (Map.Entry<Long, TsFileResourceList> entry :
sequenceFiles.entrySet()) {
+ if (TimePartitionUtils.satisfyTimePartition(timeFilter,
entry.getKey())) {
+ seq.addAll(entry.getValue().getArrayList());
+ }
+ }
+ for (Map.Entry<Long, TsFileResourceList> entry :
unsequenceFiles.entrySet()) {
+ if (TimePartitionUtils.satisfyTimePartition(timeFilter,
entry.getKey())) {
+ unSeq.addAll(entry.getValue().getArrayList());
+ }
+ }
+ } else {
+ for (Long timePartitionId : timePartitions) {
+ TsFileResourceList tsFileResources =
sequenceFiles.get(timePartitionId);
+ if (tsFileResources != null) {
+ seq.addAll(tsFileResources.getArrayList());
+ }
+ tsFileResources = unsequenceFiles.get(timePartitionId);
+ if (tsFileResources != null) {
+ unSeq.addAll(tsFileResources.getArrayList());
+ }
+ }
+ }
+ return new Pair<>(seq, unSeq);
+ }
+
public List<TsFileResource> getTsFileListSnapshot(long timePartition,
boolean sequence) {
readLock();
try {
@@ -345,6 +382,10 @@ public class TsFileManager {
resourceListLock.readLock().lock();
}
+ public boolean tryReadLock(long waitMillis) throws InterruptedException {
+ return resourceListLock.readLock().tryLock(waitMillis,
TimeUnit.MILLISECONDS);
+ }
+
public void readUnlock() {
resourceListLock.readLock().unlock();
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
index 4920f39a645..1f092e4c97c 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
@@ -74,6 +74,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.eq;
public class DataDriverTest {
@@ -91,6 +92,7 @@ public class DataDriverTest {
@Before
public void setUp() throws MetadataException, IOException,
WriteProcessException {
+
IoTDBDescriptor.getInstance().getConfig().setDriverTaskExecutionTimeSliceInMs(10000);
SeriesReaderTestUtil.setUp(
measurementSchemas, deviceIds, seqResources, unSeqResources,
DATA_DRIVER_TEST_SG);
}
@@ -116,6 +118,7 @@ public class DataDriverTest {
FragmentInstanceStateMachine stateMachine =
new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
DataRegion dataRegion = Mockito.mock(DataRegion.class);
+ Mockito.when(dataRegion.tryReadLock(Mockito.anyLong())).thenReturn(true);
FragmentInstanceContext fragmentInstanceContext =
createFragmentInstanceContext(instanceId, stateMachine);
fragmentInstanceContext.setDataRegion(dataRegion);
@@ -174,10 +177,16 @@ public class DataDriverTest {
LimitOperator limitOperator =
new LimitOperator(driverContext.getOperatorContexts().get(3), 250,
timeJoinOperator);
+ fragmentInstanceContext.setSourcePaths(driverContext.getPaths());
String deviceId = DATA_DRIVER_TEST_SG + ".device0";
Mockito.when(
dataRegion.query(
- driverContext.getPaths(), deviceId, fragmentInstanceContext,
null, null))
+ eq(driverContext.getPaths()),
+ eq(deviceId),
+ eq(fragmentInstanceContext),
+ Mockito.isNull(),
+ Mockito.isNull(),
+ Mockito.anyLong()))
.thenReturn(new QueryDataSource(seqResources, unSeqResources));
fragmentInstanceContext.initQueryDataSource(driverContext.getPaths());
fragmentInstanceContext.initializeNumOfDrivers(1);
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index f7a9c4c3854..da5fac70664 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -660,6 +660,7 @@ struct TFetchFragmentInstanceStatisticsResp {
14: optional i64 blockQueuedTime
15: optional string ip
16: optional string state
+ 17: optional i32 initDataQuerySourceRetryCount
}
/**