This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 03b76ec4d92 [IOTDB-5916]Fix exception when file is deleted during
compaction selection (#9929)
03b76ec4d92 is described below
commit 03b76ec4d92550b8c9efef506f56a051ee6cea25
Author: 周沛辰 <[email protected]>
AuthorDate: Fri Jun 9 17:40:51 2023 +0800
[IOTDB-5916]Fix exception when file is deleted during compaction selection
(#9929)
---
.../execute/task/CrossSpaceCompactionTask.java | 20 +-
.../execute/task/InnerSpaceCompactionTask.java | 17 +-
.../estimator/AbstractCompactionEstimator.java | 6 +-
.../ReadPointCrossCompactionEstimator.java | 42 +-
.../impl/RewriteCrossSpaceCompactionSelector.java | 14 +-
.../utils/CrossSpaceCompactionCandidate.java | 63 +-
.../engine/compaction/CompactionSchedulerTest.java | 5 +
.../compaction/CompactionTaskManagerTest.java | 2 +-
.../FastNonAlignedCrossCompactionTest.java | 145 ++
.../cross/CrossSpaceCompactionSelectorTest.java | 1648 ++++++++++++++++++++
.../db/engine/compaction/cross/MergeTest.java | 1 +
.../cross/RewriteCompactionFileSelectorTest.java | 47 +-
.../inner/InnerSpaceCompactionSelectorTest.java | 626 ++++++++
13 files changed, 2581 insertions(+), 55 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java
index 90119e01036..76f59f9bf5d 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java
@@ -273,7 +273,7 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
true);
} finally {
SystemInfo.getInstance().resetCompactionMemoryCost(memoryCost);
- releaseAllLock();
+ releaseAllLocksAndResetStatus();
return isSuccess;
}
}
@@ -289,16 +289,13 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
&&
this.performer.getClass().isInstance(otherCrossCompactionTask.performer);
}
- private void releaseAllLock() {
- selectedSequenceFiles.forEach(x ->
x.setStatus(TsFileResourceStatus.NORMAL));
- selectedUnsequenceFiles.forEach(x ->
x.setStatus(TsFileResourceStatus.NORMAL));
+ private void releaseAllLocksAndResetStatus() {
+ resetCompactionCandidateStatusForAllSourceFiles();
for (TsFileResource tsFileResource : holdReadLockList) {
tsFileResource.readUnlock();
- tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
}
for (TsFileResource tsFileResource : holdWriteLockList) {
tsFileResource.writeUnlock();
- tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
}
holdReadLockList.clear();
holdWriteLockList.clear();
@@ -349,6 +346,7 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
@Override
public void resetCompactionCandidateStatusForAllSourceFiles() {
+ // Only reset status of the resources whose status is COMPACTING and
COMPACTING_CANDIDATE
selectedSequenceFiles.forEach(x ->
x.setStatus(TsFileResourceStatus.NORMAL));
selectedUnsequenceFiles.forEach(x ->
x.setStatus(TsFileResourceStatus.NORMAL));
}
@@ -377,6 +375,9 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
@Override
public boolean checkValidAndSetMerging() {
+ if (!tsFileManager.isAllowCompaction()) {
+ return false;
+ }
try {
SystemInfo.getInstance().addCompactionMemoryCost(memoryCost, 60);
} catch (InterruptedException e) {
@@ -395,20 +396,17 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
}
private boolean addReadLock(List<TsFileResource> tsFileResourceList) {
- if (!tsFileManager.isAllowCompaction()) {
- return false;
- }
try {
for (TsFileResource tsFileResource : tsFileResourceList) {
tsFileResource.readLock();
holdReadLockList.add(tsFileResource);
if (!tsFileResource.setStatus(TsFileResourceStatus.COMPACTING)) {
- releaseAllLock();
+ releaseAllLocksAndResetStatus();
return false;
}
}
} catch (Throwable e) {
- releaseAllLock();
+ releaseAllLocksAndResetStatus();
throw e;
}
return true;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
index 86e3fd24179..3a84b6d0167 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -313,7 +313,7 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
isSequence());
}
} finally {
- releaseFileLocksAndResetMergingStatus();
+ releaseAllLocksAndResetStatus();
return isSuccess;
}
}
@@ -409,6 +409,7 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
@Override
public void resetCompactionCandidateStatusForAllSourceFiles() {
+ // Only reset status of the resources whose status is COMPACTING and
COMPACTING_CANDIDATE
selectedTsFileResourceList.forEach(x ->
x.setStatus(TsFileResourceStatus.NORMAL));
}
@@ -416,7 +417,8 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
* release the read lock and write lock of files if it is held, and set the
merging status of
* selected files to false
*/
- protected void releaseFileLocksAndResetMergingStatus() {
+ private void releaseAllLocksAndResetStatus() {
+ resetCompactionCandidateStatusForAllSourceFiles();
for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
TsFileResource resource = selectedTsFileResourceList.get(i);
if (isHoldingReadLock[i]) {
@@ -425,13 +427,6 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
if (isHoldingWriteLock[i]) {
resource.writeUnlock();
}
- try {
- // try to set the file's status back to NORMAL. If the status is
Deleted, its status won't
- // be changed
-
selectedTsFileResourceList.get(i).setStatus(TsFileResourceStatus.NORMAL);
- } catch (Throwable e) {
- LOGGER.error("Exception occurs when resetting resource status", e);
- }
}
}
@@ -446,12 +441,12 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
resource.readLock();
isHoldingReadLock[i] = true;
if (!resource.setStatus(TsFileResourceStatus.COMPACTING)) {
- releaseFileLocksAndResetMergingStatus();
+ releaseAllLocksAndResetStatus();
return false;
}
}
} catch (Throwable e) {
- releaseFileLocksAndResetMergingStatus();
+ releaseAllLocksAndResetStatus();
throw e;
}
return true;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/estimator/AbstractCompactionEstimator.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/estimator/AbstractCompactionEstimator.java
index 9694af7d6d6..af5a51be07b 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/estimator/AbstractCompactionEstimator.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/estimator/AbstractCompactionEstimator.java
@@ -61,9 +61,9 @@ public abstract class AbstractCompactionEstimator {
return reader;
}
- public void clear() throws IOException {
- for (TsFileSequenceReader sequenceReader : fileReaderCache.values()) {
- sequenceReader.close();
+ public void close() throws IOException {
+ for (TsFileSequenceReader reader : fileReaderCache.values()) {
+ reader.close();
}
fileReaderCache.clear();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/estimator/ReadPointCrossCompactionEstimator.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/estimator/ReadPointCrossCompactionEstimator.java
index 2e1e9d0c291..13bcce13277 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/estimator/ReadPointCrossCompactionEstimator.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/estimator/ReadPointCrossCompactionEstimator.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -56,11 +57,42 @@ public class ReadPointCrossCompactionEstimator extends
AbstractCrossSpaceEstimat
@Override
public long estimateCrossCompactionMemory(
List<TsFileResource> seqResources, TsFileResource unseqResource) throws
IOException {
- long cost = 0;
- cost += calculateReadingUnseqFile(unseqResource);
- cost += calculateReadingSeqFiles(seqResources);
- cost += calculatingWritingTargetFiles(seqResources, unseqResource);
- return cost;
+ if (!addReadLock(seqResources, unseqResource)) {
+ // there is file been deleted during selection, return -1
+ return -1L;
+ }
+ try {
+ long cost = 0;
+ cost += calculateReadingUnseqFile(unseqResource);
+ cost += calculateReadingSeqFiles(seqResources);
+ cost += calculatingWritingTargetFiles(seqResources, unseqResource);
+ return cost;
+ } finally {
+ releaseReadLock(seqResources, unseqResource);
+ }
+ }
+
+ /** Add read lock. Return false if any of the file were deleted. */
+ private boolean addReadLock(List<TsFileResource> seqResources,
TsFileResource unseqResource) {
+ List<TsFileResource> allResources = new ArrayList<>(seqResources);
+ allResources.add(unseqResource);
+ for (int i = 0; i < allResources.size(); i++) {
+ TsFileResource resource = allResources.get(i);
+ resource.readLock();
+ if (resource.isDeleted()) {
+ // release read lock
+ for (int j = 0; j <= i; j++) {
+ allResources.get(j).readUnlock();
+ }
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void releaseReadLock(List<TsFileResource> seqResources,
TsFileResource unseqResource) {
+ seqResources.forEach(TsFileResource::readUnlock);
+ unseqResource.readUnlock();
}
/**
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
index f0d30213071..635df78cd23 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
@@ -107,8 +107,8 @@ public class RewriteCrossSpaceCompactionSelector implements
ICrossSpaceSelector
* @return two lists of TsFileResource, the former is selected seqFiles and
the latter is selected
* unseqFiles or an empty array if there are no proper candidates by the
budget.
*/
- private CrossCompactionTaskResource selectOneTaskResources(
- CrossSpaceCompactionCandidate candidate) throws MergeException {
+ public CrossCompactionTaskResource
selectOneTaskResources(CrossSpaceCompactionCandidate candidate)
+ throws MergeException {
try {
LOGGER.debug(
"Selecting cross compaction task resources from {} seqFile, {}
unseqFiles",
@@ -121,7 +121,7 @@ public class RewriteCrossSpaceCompactionSelector implements
ICrossSpaceSelector
throw new MergeException(e);
} finally {
try {
- compactionEstimator.clear();
+ compactionEstimator.close();
} catch (IOException e) {
throw new MergeException(e);
}
@@ -164,6 +164,7 @@ public class RewriteCrossSpaceCompactionSelector implements
ICrossSpaceSelector
}
if (!latestSealedSeqFile.selected) {
targetSeqFiles.add(latestSealedSeqFile.resource);
+ latestSealedSeqFile.markAsSelected();
}
}
@@ -212,6 +213,10 @@ public class RewriteCrossSpaceCompactionSelector
implements ICrossSpaceSelector
List<TsFileResource> seqFiles,
long memoryCost)
throws IOException {
+ if (memoryCost == -1) {
+ // there is file been deleted during selection
+ return false;
+ }
TsFileNameGenerator.TsFileName unseqFileName =
TsFileNameGenerator.getTsFileName(unseqFile.getTsFile().getName());
// we add a hard limit for cross compaction that selected unseqFile should
reach a certain size
@@ -272,7 +277,8 @@ public class RewriteCrossSpaceCompactionSelector implements
ICrossSpaceSelector
long startTime = System.currentTimeMillis();
long ttlLowerBound = System.currentTimeMillis() - Long.MAX_VALUE;
// we record the variable `candidate` here is used for selecting more than
one
- // CrossCompactionTaskResources in this method
+ // CrossCompactionTaskResources in this method.
+ // Add read lock for candidate source files to avoid being deleted during
the selection.
CrossSpaceCompactionCandidate candidate =
new CrossSpaceCompactionCandidate(sequenceFileList,
unsequenceFileList, ttlLowerBound);
try {
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java
index 26d0ee0b1ab..074330b9196 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java
@@ -78,20 +78,25 @@ public class CrossSpaceCompactionCandidate {
List<TsFileResourceCandidate> ret = new ArrayList<>();
// The startTime and endTime of each device are different in one TsFile.
So we need to do the
- // check
- // one by one. And we cannot skip any device in the unseq file because it
may lead to omission
- // of
- // target seq file
+ // check one by one. And we cannot skip any device in the unseq file
because it may lead to
+ // omission of target seq file
+ if (!unseqFile.hasDetailedDeviceInfo()) {
+ // unseq file resource has been deleted due to TTL and cannot upgrade to
DEVICE_TIME_INDEX
+ return false;
+ }
for (DeviceInfo unseqDeviceInfo : unseqFile.getDevices()) {
for (TsFileResourceCandidate seqFile : seqFiles) {
+ // If the seqFile may need to be selected but its invalid, the
selection should be
+ // terminated.
+ if ((!seqFile.isValidCandidate || !seqFile.hasDetailedDeviceInfo())
+ && seqFile.mayHasOverlapWithUnseqFile(unseqDeviceInfo)) {
+ return false;
+ }
if (!seqFile.containsDevice(unseqDeviceInfo.deviceId)) {
continue;
}
DeviceInfo seqDeviceInfo =
seqFile.getDeviceInfoById(unseqDeviceInfo.deviceId);
- // If the seqFile should be selected but its invalid, the selection
should be terminated.
- if (!seqFile.isValidCandidate && unseqDeviceInfo.startTime <=
seqDeviceInfo.endTime) {
- return false;
- }
+
// If the unsealed file is unclosed, the file should not be selected
only when its startTime
// is larger than endTime of unseqFile. Or, the selection should be
terminated.
if (seqFile.unsealed() && unseqDeviceInfo.endTime >=
seqDeviceInfo.startTime) {
@@ -192,6 +197,8 @@ public class CrossSpaceCompactionCandidate {
public boolean isValidCandidate;
private Map<String, DeviceInfo> deviceInfoMap;
+ private boolean hasDetailedDeviceInfo;
+
protected TsFileResourceCandidate(TsFileResource tsFileResource) {
this.resource = tsFileResource;
this.selected = false;
@@ -215,12 +222,22 @@ public class CrossSpaceCompactionCandidate {
}
deviceInfoMap = new LinkedHashMap<>();
if (resource.getTimeIndexType() == ITimeIndex.FILE_TIME_INDEX_TYPE) {
- DeviceTimeIndex timeIndex = resource.buildDeviceTimeIndex();
- for (String deviceId : timeIndex.getDevices()) {
- deviceInfoMap.put(
- deviceId,
- new DeviceInfo(
- deviceId, timeIndex.getStartTime(deviceId),
timeIndex.getEndTime(deviceId)));
+ // deserialize resource file
+ resource.readLock();
+ try {
+ if (!resource.resourceFileExists()) {
+ hasDetailedDeviceInfo = false;
+ return;
+ }
+ DeviceTimeIndex timeIndex = resource.buildDeviceTimeIndex();
+ for (String deviceId : timeIndex.getDevices()) {
+ deviceInfoMap.put(
+ deviceId,
+ new DeviceInfo(
+ deviceId, timeIndex.getStartTime(deviceId),
timeIndex.getEndTime(deviceId)));
+ }
+ } finally {
+ resource.readUnlock();
}
} else {
for (String deviceId : resource.getDevices()) {
@@ -230,9 +247,10 @@ public class CrossSpaceCompactionCandidate {
deviceId, resource.getStartTime(deviceId),
resource.getEndTime(deviceId)));
}
}
+ hasDetailedDeviceInfo = true;
}
- protected void markAsSelected() {
+ public void markAsSelected() {
this.selected = true;
}
@@ -250,6 +268,21 @@ public class CrossSpaceCompactionCandidate {
prepareDeviceInfos();
return deviceInfoMap.containsKey(deviceId);
}
+
+ protected boolean hasDetailedDeviceInfo() throws IOException {
+ prepareDeviceInfos();
+ return hasDetailedDeviceInfo;
+ }
+
+ protected boolean mayHasOverlapWithUnseqFile(DeviceInfo
unseqFileDeviceInfo)
+ throws IOException {
+ prepareDeviceInfos();
+ long endTime =
+ containsDevice(unseqFileDeviceInfo.deviceId)
+ ? getDeviceInfoById(unseqFileDeviceInfo.deviceId).endTime
+ : resource.getFileEndTime();
+ return unseqFileDeviceInfo.startTime <= endTime;
+ }
}
protected static class DeviceInfo {
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
index eb9b1b64527..1dbb1bda0c4 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
@@ -1808,9 +1808,14 @@ public class CompactionSchedulerTest {
public void stopCompactionTaskManager() {
CompactionTaskManager.getInstance().clearCandidateQueue();
+ long sleepTime = 0;
while
(CompactionTaskManager.getInstance().getRunningCompactionTaskList().size() > 0)
{
try {
Thread.sleep(10);
+ sleepTime += 10;
+ if (sleepTime >= 20_000) {
+ fail();
+ }
} catch (Exception e) {
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
index 108385e84fd..01097f5ecaf 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
@@ -176,7 +176,7 @@ public class CompactionTaskManagerTest extends
InnerCompactionTest {
seqResources.get(0).readUnlock();
CompactionTaskManager.getInstance().waitAllCompactionFinish();
- // an invalid task can be submitted to waiting queue, but should not be
submitted to thread pool
+ // an invalid task cannot be submitted to waiting queue and cannot be
submitted to thread pool
try {
Assert.assertFalse(manager.addTaskToWaitingQueue(task2));
Assert.assertEquals(manager.getExecutingTaskCount(), 0);
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastNonAlignedCrossCompactionTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastNonAlignedCrossCompactionTest.java
index db75e9321a5..2c221ba27c9 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastNonAlignedCrossCompactionTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastNonAlignedCrossCompactionTest.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.db.engine.compaction.execute.performer.impl.FastCompacti
import
org.apache.iotdb.db.engine.compaction.execute.task.CrossSpaceCompactionTask;
import
org.apache.iotdb.db.engine.compaction.execute.task.InnerSpaceCompactionTask;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -282,6 +283,12 @@ public class FastNonAlignedCrossCompactionTest extends
AbstractCompactionTest {
0,
0);
Assert.assertTrue(task.start());
+ for (TsFileResource tsFileResource : seqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
validateSeqFiles(true);
@@ -545,6 +552,12 @@ public class FastNonAlignedCrossCompactionTest extends
AbstractCompactionTest {
0,
0);
Assert.assertTrue(task.start());
+ for (TsFileResource tsFileResource : seqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
validateSeqFiles(true);
@@ -807,6 +820,18 @@ public class FastNonAlignedCrossCompactionTest extends
AbstractCompactionTest {
0,
0);
Assert.assertTrue(task.start());
+ for (TsFileResource tsFileResource : seqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : seqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
validateSeqFiles(true);
@@ -1087,6 +1112,12 @@ public class FastNonAlignedCrossCompactionTest extends
AbstractCompactionTest {
0,
0);
Assert.assertTrue(task.start());
+ for (TsFileResource tsFileResource : seqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
validateSeqFiles(true);
@@ -1466,6 +1497,12 @@ public class FastNonAlignedCrossCompactionTest extends
AbstractCompactionTest {
0,
0);
Assert.assertTrue(task.start());
+ for (TsFileResource tsFileResource : seqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
validateSeqFiles(true);
@@ -1852,6 +1889,12 @@ public class FastNonAlignedCrossCompactionTest extends
AbstractCompactionTest {
0,
0);
Assert.assertTrue(task.start());
+ for (TsFileResource tsFileResource : seqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
validateSeqFiles(true);
@@ -2248,6 +2291,12 @@ public class FastNonAlignedCrossCompactionTest extends
AbstractCompactionTest {
0,
0);
Assert.assertTrue(task.start());
+ for (TsFileResource tsFileResource : seqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
validateSeqFiles(true);
@@ -2629,6 +2678,12 @@ public class FastNonAlignedCrossCompactionTest extends
AbstractCompactionTest {
0,
0);
Assert.assertTrue(task.start());
+ for (TsFileResource tsFileResource : seqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
validateSeqFiles(true);
@@ -3011,6 +3066,12 @@ public class FastNonAlignedCrossCompactionTest extends
AbstractCompactionTest {
0,
0);
Assert.assertTrue(task.start());
+ for (TsFileResource tsFileResource : seqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
validateSeqFiles(true);
@@ -3403,6 +3464,12 @@ public class FastNonAlignedCrossCompactionTest extends
AbstractCompactionTest {
0,
0);
Assert.assertTrue(task.start());
+ for (TsFileResource tsFileResource : seqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
validateSeqFiles(true);
@@ -3850,6 +3917,12 @@ public class FastNonAlignedCrossCompactionTest extends
AbstractCompactionTest {
0,
0);
Assert.assertTrue(task.start());
+ for (TsFileResource tsFileResource : seqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
validateSeqFiles(true);
@@ -4300,6 +4373,12 @@ public class FastNonAlignedCrossCompactionTest extends
AbstractCompactionTest {
0,
0);
Assert.assertTrue(task.start());
+ for (TsFileResource tsFileResource : seqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
validateSeqFiles(true);
@@ -4750,6 +4829,12 @@ public class FastNonAlignedCrossCompactionTest extends
AbstractCompactionTest {
0,
0);
Assert.assertTrue(task.start());
+ for (TsFileResource tsFileResource : seqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
validateSeqFiles(true);
@@ -5252,6 +5337,12 @@ public class FastNonAlignedCrossCompactionTest extends
AbstractCompactionTest {
0,
0);
Assert.assertTrue(task.start());
+ for (TsFileResource tsFileResource : seqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
validateSeqFiles(true);
@@ -5699,6 +5790,12 @@ public class FastNonAlignedCrossCompactionTest extends
AbstractCompactionTest {
0,
0);
Assert.assertTrue(task.start());
+ for (TsFileResource tsFileResource : seqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
validateSeqFiles(true);
@@ -5930,6 +6027,12 @@ public class FastNonAlignedCrossCompactionTest extends
AbstractCompactionTest {
0,
0);
Assert.assertTrue(task.start());
+ for (TsFileResource tsFileResource : seqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
validateSeqFiles(true);
@@ -6223,6 +6326,12 @@ public class FastNonAlignedCrossCompactionTest extends
AbstractCompactionTest {
0,
0);
Assert.assertTrue(task.start());
+ for (TsFileResource tsFileResource : seqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
validateSeqFiles(true);
@@ -6643,6 +6752,12 @@ public class FastNonAlignedCrossCompactionTest extends
AbstractCompactionTest {
0,
0);
Assert.assertTrue(task.start());
+ for (TsFileResource tsFileResource : seqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
validateSeqFiles(true);
@@ -7025,6 +7140,12 @@ public class FastNonAlignedCrossCompactionTest extends
AbstractCompactionTest {
0,
0);
Assert.assertTrue(task.start());
+ for (TsFileResource tsFileResource : seqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
validateSeqFiles(true);
@@ -7292,6 +7413,12 @@ public class FastNonAlignedCrossCompactionTest extends
AbstractCompactionTest {
0,
0);
Assert.assertTrue(task.start());
+ for (TsFileResource tsFileResource : seqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
validateSeqFiles(true);
@@ -7445,6 +7572,12 @@ public class FastNonAlignedCrossCompactionTest extends
AbstractCompactionTest {
0,
0);
Assert.assertTrue(task.start());
+ for (TsFileResource tsFileResource : seqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
validateSeqFiles(true);
@@ -7649,6 +7782,12 @@ public class FastNonAlignedCrossCompactionTest extends
AbstractCompactionTest {
0,
0);
Assert.assertTrue(task.start());
+ for (TsFileResource tsFileResource : seqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
validateSeqFiles(true);
@@ -7883,6 +8022,12 @@ public class FastNonAlignedCrossCompactionTest extends
AbstractCompactionTest {
0,
0);
Assert.assertTrue(task.start());
+ for (TsFileResource tsFileResource : seqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
validateSeqFiles(true);
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionSelectorTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionSelectorTest.java
index b5b289bea46..28d1fbed726 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionSelectorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionSelectorTest.java
@@ -21,8 +21,11 @@ package org.apache.iotdb.db.engine.compaction.cross;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.AbstractCompactionTest;
+import
org.apache.iotdb.db.engine.compaction.execute.task.CrossSpaceCompactionTask;
+import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
import
org.apache.iotdb.db.engine.compaction.selector.impl.RewriteCrossSpaceCompactionSelector;
import
org.apache.iotdb.db.engine.compaction.selector.utils.CrossCompactionTaskResource;
+import
org.apache.iotdb.db.engine.compaction.selector.utils.CrossSpaceCompactionCandidate;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.exception.StorageEngineException;
@@ -36,6 +39,8 @@ import org.junit.Test;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
public class CrossSpaceCompactionSelectorTest extends AbstractCompactionTest {
@@ -168,4 +173,1647 @@ public class CrossSpaceCompactionSelectorTest extends
AbstractCompactionTest {
selected = selector.selectCrossSpaceTask(seqResources, unseqResources);
Assert.assertEquals(0, selected.size());
}
+
+ @Test
+ public void testSeqFileWithDeviceIndexBeenDeletedBeforeSelection()
+ throws IOException, MetadataException, WriteProcessException,
InterruptedException {
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, true);
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, false);
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ AtomicBoolean fail = new AtomicBoolean(false);
+ CountDownLatch cd1 = new CountDownLatch(1);
+ CountDownLatch cd2 = new CountDownLatch(1);
+
+ Thread thread1 =
+ new Thread(
+ () -> {
+ try {
+ // the file is deleted before selection
+ cd1.countDown();
+ cd2.await();
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ CrossSpaceCompactionCandidate candidate =
+ new CrossSpaceCompactionCandidate(
+ seqResources, unseqResources,
System.currentTimeMillis() - Long.MAX_VALUE);
+
+ CrossCompactionTaskResource crossCompactionTaskResource =
+ selector.selectOneTaskResources(candidate);
+ if (!crossCompactionTaskResource.isValid()) {
+ throw new RuntimeException("compaction task resource is not
valid");
+ }
+ if (crossCompactionTaskResource.getSeqFiles().size() != 1) {
+ throw new RuntimeException("selected seq file num is not 1");
+ }
+ if (crossCompactionTaskResource.getUnseqFiles().size() != 1) {
+ throw new RuntimeException("selected unseq file num is not
1");
+ }
+
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ });
+
+ // delete seq files
+ Thread thread2 =
+ new Thread(
+ () -> {
+ try {
+ cd1.await();
+ TsFileResource resource = seqResources.get(1);
+ // try to delete file
+ resource.writeLock();
+ resource.remove();
+ resource.writeUnlock();
+ cd2.countDown();
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ });
+
+ thread1.start();
+ thread2.start();
+ thread1.join(10000);
+ thread2.join(10000);
+ if (fail.get()) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void
testSeqFileWithDeviceIndexBeenDeletedDuringSelectionAndAfterCopyingList()
+ throws IOException, MetadataException, WriteProcessException,
InterruptedException {
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, true);
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, false);
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ AtomicBoolean fail = new AtomicBoolean(false);
+ CountDownLatch cd1 = new CountDownLatch(1);
+ CountDownLatch cd2 = new CountDownLatch(1);
+
+ // select files in cross compaction
+ Thread thread1 =
+ new Thread(
+ () -> {
+ try {
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ // copy candidate source file list and add read lock
+ CrossSpaceCompactionCandidate candidate =
+ new CrossSpaceCompactionCandidate(
+ seqResources, unseqResources,
System.currentTimeMillis() - Long.MAX_VALUE);
+
+ // the other thread holds write lock and delete file
successfully after copying list
+ cd1.countDown();
+ cd2.await();
+ CrossCompactionTaskResource crossCompactionTaskResource =
+ selector.selectOneTaskResources(candidate);
+ if (!crossCompactionTaskResource.isValid()) {
+ throw new RuntimeException("compaction task resource is not
valid");
+ }
+ if (crossCompactionTaskResource.getSeqFiles().size() != 1) {
+ throw new RuntimeException("selected seq file should be 1");
+ }
+ if (crossCompactionTaskResource.getUnseqFiles().size() != 1) {
+ throw new RuntimeException("selected unseq file num should
be 1");
+ }
+
+ CrossSpaceCompactionTask crossSpaceCompactionTask =
+ new CrossSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ crossCompactionTaskResource.getSeqFiles(),
+ crossCompactionTaskResource.getUnseqFiles(),
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getCrossCompactionPerformer()
+ .createInstance(),
+ CompactionTaskManager.currentTaskNum,
+ crossCompactionTaskResource.getTotalMemoryCost(),
+ tsFileManager.getNextCompactionTaskId());
+ // set file status to COMPACTION_CANDIDATE
+ if
(!crossSpaceCompactionTask.setSourceFilesToCompactionCandidate()) {
+ throw new RuntimeException("set status should be true");
+ }
+ for (int i = 0; i < seqResources.size(); i++) {
+ TsFileResource resource = seqResources.get(i);
+ if (i < 1) {
+ if (resource.getStatus() !=
TsFileResourceStatus.COMPACTION_CANDIDATE) {
+ throw new RuntimeException("status should be
COMPACTION_CANDIDATE");
+ }
+ } else if (i == 1) {
+ if (resource.getStatus() != TsFileResourceStatus.DELETED) {
+ throw new RuntimeException("status should be DELETED");
+ }
+ } else if (resource.getStatus() !=
TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ for (int i = 0; i < unseqResources.size(); i++) {
+ TsFileResource resource = unseqResources.get(i);
+ if (i < 1) {
+ if (resource.getStatus() !=
TsFileResourceStatus.COMPACTION_CANDIDATE) {
+ throw new RuntimeException("status should be
COMPACTION_CANDIDATE");
+ }
+ } else if (resource.getStatus() !=
TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ // delete seq files
+ Thread thread2 =
+ new Thread(
+ () -> {
+ try {
+ cd1.await();
+ TsFileResource resource = seqResources.get(1);
+ // try to delete file
+ resource.writeLock();
+ resource.remove();
+ resource.writeUnlock();
+ cd2.countDown();
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ thread1.start();
+ thread2.start();
+ thread1.join(10000);
+ thread2.join(10000);
+ if (fail.get()) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void
testSeqFileWithDeviceIndexBeenDeletedDuringSelectionAndBeforeSettingCandidate()
+ throws IOException, MetadataException, WriteProcessException,
InterruptedException {
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, true);
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, false);
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ AtomicBoolean fail = new AtomicBoolean(false);
+ CountDownLatch cd1 = new CountDownLatch(1);
+ CountDownLatch cd2 = new CountDownLatch(1);
+
+ // select files in cross compaction
+ Thread thread1 =
+ new Thread(
+ () -> {
+ try {
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ // copy candidate source file list and add read lock
+ CrossSpaceCompactionCandidate candidate =
+ new CrossSpaceCompactionCandidate(
+ seqResources, unseqResources,
System.currentTimeMillis() - Long.MAX_VALUE);
+
+ CrossCompactionTaskResource crossCompactionTaskResource =
+ selector.selectOneTaskResources(candidate);
+ if (!crossCompactionTaskResource.isValid()) {
+ throw new RuntimeException("compaction task resource is not
valid");
+ }
+ if (crossCompactionTaskResource.getSeqFiles().size() != 5) {
+ throw new RuntimeException("selected seq file num is not 5");
+ }
+ if (crossCompactionTaskResource.getUnseqFiles().size() != 5) {
+ throw new RuntimeException("selected unseq file num is not
5");
+ }
+
+ // the other thread holds write lock and delete file
successfully before setting
+ // file status to COMPACTION_CANDIDATE
+ cd1.countDown();
+ cd2.await();
+
+ CrossSpaceCompactionTask crossSpaceCompactionTask =
+ new CrossSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ crossCompactionTaskResource.getSeqFiles(),
+ crossCompactionTaskResource.getUnseqFiles(),
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getCrossCompactionPerformer()
+ .createInstance(),
+ CompactionTaskManager.currentTaskNum,
+ crossCompactionTaskResource.getTotalMemoryCost(),
+ tsFileManager.getNextCompactionTaskId());
+ // set file status to COMPACTION_CANDIDATE
+ if
(crossSpaceCompactionTask.setSourceFilesToCompactionCandidate()) {
+ throw new RuntimeException("set status should be false");
+ }
+ for (int i = 0; i < seqResources.size(); i++) {
+ TsFileResource resource = seqResources.get(i);
+ if (i == 1) {
+ if (resource.getStatus() != TsFileResourceStatus.DELETED) {
+ throw new RuntimeException("status should be DELETED");
+ }
+ } else if (resource.getStatus() !=
TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ for (int i = 0; i < unseqResources.size(); i++) {
+ TsFileResource resource = unseqResources.get(i);
+ if (resource.getStatus() != TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ // delete seq files
+ Thread thread2 =
+ new Thread(
+ () -> {
+ try {
+ cd1.await();
+ TsFileResource resource = seqResources.get(1);
+ // try to delete file
+ resource.writeLock();
+ resource.remove();
+ resource.writeUnlock();
+ cd2.countDown();
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ thread1.start();
+ thread2.start();
+ thread1.join(10000);
+ thread2.join(10000);
+ if (fail.get()) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void
testSeqFileWithDeviceIndexBeenDeletedDuringSelectionAndBeforeSettingCompacting()
+ throws IOException, MetadataException, WriteProcessException,
InterruptedException {
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, true);
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, false);
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ AtomicBoolean fail = new AtomicBoolean(false);
+ CountDownLatch cd1 = new CountDownLatch(1);
+ CountDownLatch cd2 = new CountDownLatch(1);
+
+ // select files in cross compaction
+ Thread thread1 =
+ new Thread(
+ () -> {
+ try {
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ // copy candidate source file list and add read lock
+ CrossSpaceCompactionCandidate candidate =
+ new CrossSpaceCompactionCandidate(
+ seqResources, unseqResources,
System.currentTimeMillis() - Long.MAX_VALUE);
+
+ CrossCompactionTaskResource crossCompactionTaskResource =
+ selector.selectOneTaskResources(candidate);
+ if (!crossCompactionTaskResource.isValid()) {
+ throw new RuntimeException("compaction task resource is not
valid");
+ }
+ if (crossCompactionTaskResource.getSeqFiles().size() != 5) {
+ throw new RuntimeException("selected seq file num is not 5");
+ }
+ if (crossCompactionTaskResource.getUnseqFiles().size() != 5) {
+ throw new RuntimeException("selected unseq file num is not
5");
+ }
+
+ CrossSpaceCompactionTask crossSpaceCompactionTask =
+ new CrossSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ crossCompactionTaskResource.getSeqFiles(),
+ crossCompactionTaskResource.getUnseqFiles(),
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getCrossCompactionPerformer()
+ .createInstance(),
+ CompactionTaskManager.currentTaskNum,
+ crossCompactionTaskResource.getTotalMemoryCost(),
+ tsFileManager.getNextCompactionTaskId());
+ // set file status to COMPACTION_CANDIDATE and add into queue
+ if
(!crossSpaceCompactionTask.setSourceFilesToCompactionCandidate()) {
+ throw new RuntimeException("fail to set status to compaction
candidate.");
+ }
+
+ // the other thread delete the file successfully when the
compaction task is blocked
+ // in the queue
+ cd1.countDown();
+ cd2.await();
+
+ if (crossSpaceCompactionTask.checkValidAndSetMerging()) {
+ throw new RuntimeException("cross space compaction task
should be invalid.");
+ }
+
+ for (int i = 0; i < seqResources.size(); i++) {
+ TsFileResource resource = seqResources.get(i);
+ if (i == 1) {
+ if (resource.getStatus() != TsFileResourceStatus.DELETED) {
+ throw new RuntimeException("status should be DELETED");
+ }
+ } else if (resource.getStatus() !=
TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ for (int i = 0; i < unseqResources.size(); i++) {
+ TsFileResource resource = unseqResources.get(i);
+ if (resource.getStatus() != TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ // delete seq files
+ Thread thread2 =
+ new Thread(
+ () -> {
+ try {
+ cd1.await();
+ TsFileResource resource = seqResources.get(1);
+ // try to delete file
+ resource.writeLock();
+ resource.remove();
+ resource.writeUnlock();
+ cd2.countDown();
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ thread1.start();
+ thread2.start();
+ thread1.join(10000);
+ thread2.join(10000);
+ if (fail.get()) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void
testSeqFileWithFileIndexBeenDeletedDuringSelectionAndAfterCopyingList()
+ throws IOException, MetadataException, WriteProcessException,
InterruptedException {
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, true);
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, false);
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ AtomicBoolean fail = new AtomicBoolean(false);
+ CountDownLatch cd1 = new CountDownLatch(1);
+ CountDownLatch cd2 = new CountDownLatch(1);
+ seqResources.get(0).degradeTimeIndex();
+ seqResources.get(1).degradeTimeIndex();
+ seqResources.get(2).degradeTimeIndex();
+ unseqResources.get(1).degradeTimeIndex();
+
+ // select files in cross compaction
+ Thread thread1 =
+ new Thread(
+ () -> {
+ try {
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ // copy candidate source file list and add read lock
+ CrossSpaceCompactionCandidate candidate =
+ new CrossSpaceCompactionCandidate(
+ seqResources, unseqResources,
System.currentTimeMillis() - Long.MAX_VALUE);
+
+ // the other thread holds write lock and delete file
successfully after copying list
+ cd1.countDown();
+ cd2.await();
+
+ CrossCompactionTaskResource crossCompactionTaskResource =
+ selector.selectOneTaskResources(candidate);
+ if (!crossCompactionTaskResource.isValid()) {
+ throw new RuntimeException("compaction task resource is not
valid");
+ }
+ if (crossCompactionTaskResource.getSeqFiles().size() != 1) {
+ throw new RuntimeException("selected seq file num should be
1");
+ }
+ if (crossCompactionTaskResource.getUnseqFiles().size() != 1) {
+ throw new RuntimeException("selected unseq file num should
be 1");
+ }
+
+ CrossSpaceCompactionTask crossSpaceCompactionTask =
+ new CrossSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ crossCompactionTaskResource.getSeqFiles(),
+ crossCompactionTaskResource.getUnseqFiles(),
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getCrossCompactionPerformer()
+ .createInstance(),
+ CompactionTaskManager.currentTaskNum,
+ crossCompactionTaskResource.getTotalMemoryCost(),
+ tsFileManager.getNextCompactionTaskId());
+ // set file status to COMPACTION_CANDIDATE
+ if
(!crossSpaceCompactionTask.setSourceFilesToCompactionCandidate()) {
+ throw new RuntimeException("set status should be true");
+ }
+ for (int i = 0; i < seqResources.size(); i++) {
+ TsFileResource resource = seqResources.get(i);
+ if (i < 1) {
+ if (resource.getStatus() !=
TsFileResourceStatus.COMPACTION_CANDIDATE) {
+ throw new RuntimeException("status should be
COMPACTION_CANDIDATE");
+ }
+ } else if (i == 1) {
+ if (resource.getStatus() != TsFileResourceStatus.DELETED) {
+ throw new RuntimeException("status should be DELETED");
+ }
+ } else if (resource.getStatus() !=
TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ for (int i = 0; i < unseqResources.size(); i++) {
+ TsFileResource resource = unseqResources.get(i);
+ if (i < 1) {
+ if (resource.getStatus() !=
TsFileResourceStatus.COMPACTION_CANDIDATE) {
+ throw new RuntimeException("status should be
COMPACTION_CANDIDATE");
+ }
+ } else if (resource.getStatus() !=
TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ // delete seq files
+ Thread thread2 =
+ new Thread(
+ () -> {
+ try {
+ cd1.await();
+ TsFileResource resource = seqResources.get(1);
+ // try to delete file
+ resource.writeLock();
+ resource.remove();
+ resource.writeUnlock();
+ cd2.countDown();
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ thread1.start();
+ thread2.start();
+ thread1.join();
+ thread2.join();
+ if (fail.get()) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void
testSeqFileWithFileIndexBeenDeletedDuringSelectionAndBeforeSettingCandidate()
+ throws IOException, MetadataException, WriteProcessException,
InterruptedException {
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, true);
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, false);
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ AtomicBoolean fail = new AtomicBoolean(false);
+ CountDownLatch cd1 = new CountDownLatch(1);
+ CountDownLatch cd2 = new CountDownLatch(1);
+ seqResources.get(0).degradeTimeIndex();
+ seqResources.get(1).degradeTimeIndex();
+ seqResources.get(2).degradeTimeIndex();
+ unseqResources.get(1).degradeTimeIndex();
+
+ // select files in cross compaction
+ Thread thread1 =
+ new Thread(
+ () -> {
+ try {
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ // copy candidate source file list and add read lock
+ CrossSpaceCompactionCandidate candidate =
+ new CrossSpaceCompactionCandidate(
+ seqResources, unseqResources,
System.currentTimeMillis() - Long.MAX_VALUE);
+
+ CrossCompactionTaskResource crossCompactionTaskResource =
+ selector.selectOneTaskResources(candidate);
+ if (!crossCompactionTaskResource.isValid()) {
+ throw new RuntimeException("compaction task resource is not
valid");
+ }
+ if (crossCompactionTaskResource.getSeqFiles().size() != 5) {
+ throw new RuntimeException("selected seq file num is not 5");
+ }
+ if (crossCompactionTaskResource.getUnseqFiles().size() != 5) {
+ throw new RuntimeException("selected unseq file num is not
5");
+ }
+
+ // the other thread holds write lock and delete file
successfully before setting
+ // file status to COMPACTION_CANDIDATE
+ cd1.countDown();
+ cd2.await();
+
+ CrossSpaceCompactionTask crossSpaceCompactionTask =
+ new CrossSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ crossCompactionTaskResource.getSeqFiles(),
+ crossCompactionTaskResource.getUnseqFiles(),
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getCrossCompactionPerformer()
+ .createInstance(),
+ CompactionTaskManager.currentTaskNum,
+ crossCompactionTaskResource.getTotalMemoryCost(),
+ tsFileManager.getNextCompactionTaskId());
+ // set file status to COMPACTION_CANDIDATE
+ if
(crossSpaceCompactionTask.setSourceFilesToCompactionCandidate()) {
+ throw new RuntimeException("set status should be false");
+ }
+ for (int i = 0; i < seqResources.size(); i++) {
+ TsFileResource resource = seqResources.get(i);
+ if (i == 1) {
+ if (resource.getStatus() != TsFileResourceStatus.DELETED) {
+ throw new RuntimeException("status should be DELETED");
+ }
+ } else if (resource.getStatus() !=
TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ for (int i = 0; i < unseqResources.size(); i++) {
+ TsFileResource resource = unseqResources.get(i);
+ if (resource.getStatus() != TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ // delete seq files
+ Thread thread2 =
+ new Thread(
+ () -> {
+ try {
+ cd1.await();
+ TsFileResource resource = seqResources.get(1);
+ // try to delete file
+ resource.writeLock();
+ resource.remove();
+ resource.writeUnlock();
+ cd2.countDown();
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ thread1.start();
+ thread2.start();
+ thread1.join();
+ thread2.join();
+ if (fail.get()) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void
testSeqFileWithFileIndexBeenDeletedDuringSelectionAndBeforeSettingCompacting()
+ throws IOException, MetadataException, WriteProcessException,
InterruptedException {
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, true);
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, false);
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ AtomicBoolean fail = new AtomicBoolean(false);
+ CountDownLatch cd1 = new CountDownLatch(1);
+ CountDownLatch cd2 = new CountDownLatch(1);
+ seqResources.get(0).degradeTimeIndex();
+ seqResources.get(1).degradeTimeIndex();
+ seqResources.get(2).degradeTimeIndex();
+ unseqResources.get(1).degradeTimeIndex();
+
+ // select files in cross compaction
+ Thread thread1 =
+ new Thread(
+ () -> {
+ try {
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ // copy candidate source file list and add read lock
+ CrossSpaceCompactionCandidate candidate =
+ new CrossSpaceCompactionCandidate(
+ seqResources, unseqResources,
System.currentTimeMillis() - Long.MAX_VALUE);
+
+ CrossCompactionTaskResource crossCompactionTaskResource =
+ selector.selectOneTaskResources(candidate);
+ if (!crossCompactionTaskResource.isValid()) {
+ throw new RuntimeException("compaction task resource is not
valid");
+ }
+ if (crossCompactionTaskResource.getSeqFiles().size() != 5) {
+ throw new RuntimeException("selected seq file num is not 5");
+ }
+ if (crossCompactionTaskResource.getUnseqFiles().size() != 5) {
+ throw new RuntimeException("selected unseq file num is not
5");
+ }
+
+ // the other thread holds write lock and delete file
successfully before setting
+ // file status to COMPACTION_CANDIDATE
+
+ CrossSpaceCompactionTask crossSpaceCompactionTask =
+ new CrossSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ crossCompactionTaskResource.getSeqFiles(),
+ crossCompactionTaskResource.getUnseqFiles(),
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getCrossCompactionPerformer()
+ .createInstance(),
+ CompactionTaskManager.currentTaskNum,
+ crossCompactionTaskResource.getTotalMemoryCost(),
+ tsFileManager.getNextCompactionTaskId());
+ // set file status to COMPACTION_CANDIDATE and add into queue
+ if
(!crossSpaceCompactionTask.setSourceFilesToCompactionCandidate()) {
+ throw new RuntimeException("fail to set status to compaction
candidate.");
+ }
+
+ // the other thread delete the file successfully when the
compaction task is blocked
+ // in the queue
+ cd1.countDown();
+ cd2.await();
+
+ if (crossSpaceCompactionTask.checkValidAndSetMerging()) {
+ throw new RuntimeException("cross space compaction task
should be invalid.");
+ }
+
+ for (int i = 0; i < seqResources.size(); i++) {
+ TsFileResource resource = seqResources.get(i);
+ if (i == 1) {
+ if (resource.getStatus() != TsFileResourceStatus.DELETED) {
+ throw new RuntimeException("status should be DELETED");
+ }
+ } else if (resource.getStatus() !=
TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ for (int i = 0; i < unseqResources.size(); i++) {
+ TsFileResource resource = unseqResources.get(i);
+ if (resource.getStatus() != TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ // delete seq files
+ Thread thread2 =
+ new Thread(
+ () -> {
+ try {
+ cd1.await();
+ TsFileResource resource = seqResources.get(1);
+ // try to delete file
+ resource.writeLock();
+ resource.remove();
+ resource.writeUnlock();
+ cd2.countDown();
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ thread1.start();
+ thread2.start();
+ thread1.join(10000);
+ thread2.join(10000);
+ if (fail.get()) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testSeqFileWithFileIndexBeenDeletedBeforeSelection()
+ throws IOException, MetadataException, WriteProcessException,
InterruptedException {
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, true);
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, false);
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ AtomicBoolean fail = new AtomicBoolean(false);
+ CountDownLatch cd1 = new CountDownLatch(1);
+ CountDownLatch cd2 = new CountDownLatch(1);
+ seqResources.get(1).degradeTimeIndex();
+ seqResources.get(2).degradeTimeIndex();
+ unseqResources.get(1).degradeTimeIndex();
+
+ Thread thread1 =
+ new Thread(
+ () -> {
+ try {
+ // the file is deleted before selection
+ cd1.countDown();
+ cd2.await();
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ CrossSpaceCompactionCandidate candidate =
+ new CrossSpaceCompactionCandidate(
+ seqResources, unseqResources,
System.currentTimeMillis() - Long.MAX_VALUE);
+
+ CrossCompactionTaskResource crossCompactionTaskResource =
+ selector.selectOneTaskResources(candidate);
+ if (!crossCompactionTaskResource.isValid()) {
+ throw new RuntimeException("compaction task resource is not
valid");
+ }
+ if (crossCompactionTaskResource.getSeqFiles().size() != 1) {
+ throw new RuntimeException("selected seq file num is not 1");
+ }
+ if (crossCompactionTaskResource.getUnseqFiles().size() != 1) {
+ throw new RuntimeException("selected unseq file num is not
1");
+ }
+
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ });
+
+ // delete seq files
+ Thread thread2 =
+ new Thread(
+ () -> {
+ try {
+ cd1.await();
+ TsFileResource resource = seqResources.get(1);
+ // try to delete file
+ resource.writeLock();
+ resource.remove();
+ resource.writeUnlock();
+ cd2.countDown();
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ });
+
+ thread1.start();
+ thread2.start();
+ thread1.join(10000);
+ thread2.join(10000);
+ if (fail.get()) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testUnSeqFileWithDeviceIndexBeenDeletedBeforeSelection()
+ throws IOException, MetadataException, WriteProcessException,
InterruptedException {
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, true);
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, false);
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ AtomicBoolean fail = new AtomicBoolean(false);
+ CountDownLatch cd1 = new CountDownLatch(1);
+ CountDownLatch cd2 = new CountDownLatch(1);
+
+ Thread thread1 =
+ new Thread(
+ () -> {
+ try {
+ // the file is deleted before selection
+ cd1.countDown();
+ cd2.await();
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ CrossSpaceCompactionCandidate candidate =
+ new CrossSpaceCompactionCandidate(
+ seqResources, unseqResources,
System.currentTimeMillis() - Long.MAX_VALUE);
+
+ CrossCompactionTaskResource crossCompactionTaskResource =
+ selector.selectOneTaskResources(candidate);
+ if (!crossCompactionTaskResource.isValid()) {
+ throw new RuntimeException("compaction task resource is not
valid");
+ }
+ if (crossCompactionTaskResource.getSeqFiles().size() != 1) {
+ throw new RuntimeException("selected seq file num is not 1");
+ }
+ if (crossCompactionTaskResource.getUnseqFiles().size() != 1) {
+ throw new RuntimeException("selected unseq file num is not
1");
+ }
+
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ });
+
+ // delete seq files
+ Thread thread2 =
+ new Thread(
+ () -> {
+ try {
+ cd1.await();
+ TsFileResource resource = unseqResources.get(1);
+ // try to delete file
+ resource.writeLock();
+ resource.remove();
+ resource.writeUnlock();
+ cd2.countDown();
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ });
+
+ thread1.start();
+ thread2.start();
+ thread1.join(10000);
+ thread2.join(10000);
+ if (fail.get()) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void
testUnSeqFileWithDeviceIndexBeenDeletedDuringSelectionAndAfterCopyingList()
+ throws IOException, MetadataException, WriteProcessException,
InterruptedException {
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, true);
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, false);
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ AtomicBoolean fail = new AtomicBoolean(false);
+ CountDownLatch cd1 = new CountDownLatch(1);
+ CountDownLatch cd2 = new CountDownLatch(1);
+
+ // select files in cross compaction
+ Thread thread1 =
+ new Thread(
+ () -> {
+ try {
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ // copy candidate source file list and add read lock
+ CrossSpaceCompactionCandidate candidate =
+ new CrossSpaceCompactionCandidate(
+ seqResources, unseqResources,
System.currentTimeMillis() - Long.MAX_VALUE);
+
+ // the other thread holds write lock and delete file
successfully after copying list
+ cd1.countDown();
+ cd2.await();
+
+ CrossCompactionTaskResource crossCompactionTaskResource =
+ selector.selectOneTaskResources(candidate);
+ if (!crossCompactionTaskResource.isValid()) {
+ throw new RuntimeException("compaction task resource is not
valid");
+ }
+ if (crossCompactionTaskResource.getSeqFiles().size() != 1) {
+ throw new RuntimeException("selected seq file num is not 1");
+ }
+ if (crossCompactionTaskResource.getUnseqFiles().size() != 1) {
+ throw new RuntimeException("selected unseq file num is not
1");
+ }
+
+ CrossSpaceCompactionTask crossSpaceCompactionTask =
+ new CrossSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ crossCompactionTaskResource.getSeqFiles(),
+ crossCompactionTaskResource.getUnseqFiles(),
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getCrossCompactionPerformer()
+ .createInstance(),
+ CompactionTaskManager.currentTaskNum,
+ crossCompactionTaskResource.getTotalMemoryCost(),
+ tsFileManager.getNextCompactionTaskId());
+ // set file status to COMPACTION_CANDIDATE
+ if
(!crossSpaceCompactionTask.setSourceFilesToCompactionCandidate()) {
+ throw new RuntimeException("set status should be true");
+ }
+ for (int i = 0; i < unseqResources.size(); i++) {
+ TsFileResource resource = unseqResources.get(i);
+ if (i < 1) {
+ if (resource.getStatus() !=
TsFileResourceStatus.COMPACTION_CANDIDATE) {
+ throw new RuntimeException("status should be
COMPACTION_CANDIDATE");
+ }
+ } else if (i == 1) {
+ if (resource.getStatus() != TsFileResourceStatus.DELETED) {
+ throw new RuntimeException("status should be DELETED");
+ }
+ } else if (resource.getStatus() !=
TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ for (int i = 0; i < seqResources.size(); i++) {
+ TsFileResource resource = seqResources.get(i);
+ if (i < 1) {
+ if (resource.getStatus() !=
TsFileResourceStatus.COMPACTION_CANDIDATE) {
+ throw new RuntimeException("status should be
COMPACTION_CANDIDATE");
+ }
+ } else if (resource.getStatus() !=
TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ // delete seq files
+ Thread thread2 =
+ new Thread(
+ () -> {
+ try {
+ cd1.await();
+ TsFileResource resource = unseqResources.get(1);
+ // try to delete file
+ resource.writeLock();
+ resource.remove();
+ resource.writeUnlock();
+ cd2.countDown();
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ thread1.start();
+ thread2.start();
+ thread1.join(10000);
+ thread2.join(10000);
+ if (fail.get()) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void
testUnSeqFileWithDeviceIndexBeenDeletedDuringSelectionAndBeforeSettingCandidate()
+ throws IOException, MetadataException, WriteProcessException,
InterruptedException {
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, true);
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, false);
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ AtomicBoolean fail = new AtomicBoolean(false);
+ CountDownLatch cd1 = new CountDownLatch(1);
+ CountDownLatch cd2 = new CountDownLatch(1);
+
+ // select files in cross compaction
+ Thread thread1 =
+ new Thread(
+ () -> {
+ try {
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ // copy candidate source file list and add read lock
+ CrossSpaceCompactionCandidate candidate =
+ new CrossSpaceCompactionCandidate(
+ seqResources, unseqResources,
System.currentTimeMillis() - Long.MAX_VALUE);
+
+ CrossCompactionTaskResource crossCompactionTaskResource =
+ selector.selectOneTaskResources(candidate);
+ if (!crossCompactionTaskResource.isValid()) {
+ throw new RuntimeException("compaction task resource is not
valid");
+ }
+ if (crossCompactionTaskResource.getSeqFiles().size() != 5) {
+ throw new RuntimeException("selected seq file num is not 5");
+ }
+ if (crossCompactionTaskResource.getUnseqFiles().size() != 5) {
+ throw new RuntimeException("selected unseq file num is not
5");
+ }
+
+ // the other thread holds write lock and delete file
successfully before setting
+ // file status to COMPACTION_CANDIDATE
+ cd1.countDown();
+ cd2.await();
+
+ CrossSpaceCompactionTask crossSpaceCompactionTask =
+ new CrossSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ crossCompactionTaskResource.getSeqFiles(),
+ crossCompactionTaskResource.getUnseqFiles(),
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getCrossCompactionPerformer()
+ .createInstance(),
+ CompactionTaskManager.currentTaskNum,
+ crossCompactionTaskResource.getTotalMemoryCost(),
+ tsFileManager.getNextCompactionTaskId());
+ // set file status to COMPACTION_CANDIDATE
+ if
(crossSpaceCompactionTask.setSourceFilesToCompactionCandidate()) {
+ throw new RuntimeException("set status should be false");
+ }
+ for (int i = 0; i < unseqResources.size(); i++) {
+ TsFileResource resource = unseqResources.get(i);
+ if (i == 1) {
+ if (resource.getStatus() != TsFileResourceStatus.DELETED) {
+ throw new RuntimeException("status should be DELETED");
+ }
+ } else if (resource.getStatus() !=
TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ for (int i = 0; i < seqResources.size(); i++) {
+ TsFileResource resource = seqResources.get(i);
+ if (resource.getStatus() != TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ // delete seq files
+ Thread thread2 =
+ new Thread(
+ () -> {
+ try {
+ cd1.await();
+ TsFileResource resource = unseqResources.get(1);
+ // try to delete file
+ resource.writeLock();
+ resource.remove();
+ resource.writeUnlock();
+ cd2.countDown();
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ thread1.start();
+ thread2.start();
+ thread1.join(10000);
+ thread2.join(10000);
+ if (fail.get()) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void
testUnSeqFileWithDeviceIndexBeenDeletedDuringSelectionAndBeforeSettingCompacting()
+ throws IOException, MetadataException, WriteProcessException,
InterruptedException {
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, true);
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, false);
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ AtomicBoolean fail = new AtomicBoolean(false);
+ CountDownLatch cd1 = new CountDownLatch(1);
+ CountDownLatch cd2 = new CountDownLatch(1);
+
+ // select files in cross compaction
+ Thread thread1 =
+ new Thread(
+ () -> {
+ try {
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ // copy candidate source file list and add read lock
+ CrossSpaceCompactionCandidate candidate =
+ new CrossSpaceCompactionCandidate(
+ seqResources, unseqResources,
System.currentTimeMillis() - Long.MAX_VALUE);
+
+ CrossCompactionTaskResource crossCompactionTaskResource =
+ selector.selectOneTaskResources(candidate);
+ if (!crossCompactionTaskResource.isValid()) {
+ throw new RuntimeException("compaction task resource is not
valid");
+ }
+ if (crossCompactionTaskResource.getSeqFiles().size() != 5) {
+ throw new RuntimeException("selected seq file num is not 5");
+ }
+ if (crossCompactionTaskResource.getUnseqFiles().size() != 5) {
+ throw new RuntimeException("selected unseq file num is not
5");
+ }
+
+ // the other thread holds write lock and delete file
successfully before setting
+ // file status to COMPACTION_CANDIDATE
+
+ CrossSpaceCompactionTask crossSpaceCompactionTask =
+ new CrossSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ crossCompactionTaskResource.getSeqFiles(),
+ crossCompactionTaskResource.getUnseqFiles(),
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getCrossCompactionPerformer()
+ .createInstance(),
+ CompactionTaskManager.currentTaskNum,
+ crossCompactionTaskResource.getTotalMemoryCost(),
+ tsFileManager.getNextCompactionTaskId());
+ // set file status to COMPACTION_CANDIDATE and add into queue
+ if
(!crossSpaceCompactionTask.setSourceFilesToCompactionCandidate()) {
+ throw new RuntimeException("fail to set status to compaction
candidate.");
+ }
+
+ // the other thread delete the file successfully when the
compaction task is blocked
+ // in the queue
+ cd1.countDown();
+ cd2.await();
+
+ if (crossSpaceCompactionTask.checkValidAndSetMerging()) {
+ throw new RuntimeException("cross space compaction task
should be invalid.");
+ }
+
+ for (int i = 0; i < unseqResources.size(); i++) {
+ TsFileResource resource = unseqResources.get(i);
+ if (i == 1) {
+ if (resource.getStatus() != TsFileResourceStatus.DELETED) {
+ throw new RuntimeException("status should be DELETED");
+ }
+ } else if (resource.getStatus() !=
TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ for (int i = 0; i < seqResources.size(); i++) {
+ TsFileResource resource = seqResources.get(i);
+ if (resource.getStatus() != TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ // delete seq files
+ Thread thread2 =
+ new Thread(
+ () -> {
+ try {
+ cd1.await();
+ TsFileResource resource = unseqResources.get(1);
+ // try to delete file
+ resource.writeLock();
+ resource.remove();
+ resource.writeUnlock();
+ cd2.countDown();
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ thread1.start();
+ thread2.start();
+ thread1.join(10000);
+ thread2.join(10000);
+ if (fail.get()) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void
testUnSeqFileWithFileIndexBeenDeletedDuringSelectionAndAfterCopyingList()
+ throws IOException, MetadataException, WriteProcessException,
InterruptedException {
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, true);
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, false);
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ AtomicBoolean fail = new AtomicBoolean(false);
+ CountDownLatch cd1 = new CountDownLatch(1);
+ CountDownLatch cd2 = new CountDownLatch(1);
+ unseqResources.get(0).degradeTimeIndex();
+ unseqResources.get(1).degradeTimeIndex();
+ unseqResources.get(2).degradeTimeIndex();
+ seqResources.get(1).degradeTimeIndex();
+
+ // select files in cross compaction
+ Thread thread1 =
+ new Thread(
+ () -> {
+ try {
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ // copy candidate source file list and add read lock
+ CrossSpaceCompactionCandidate candidate =
+ new CrossSpaceCompactionCandidate(
+ seqResources, unseqResources,
System.currentTimeMillis() - Long.MAX_VALUE);
+
+ // the other thread holds write lock and delete file
successfully after copying list
+ cd1.countDown();
+ cd2.await();
+ CrossCompactionTaskResource crossCompactionTaskResource =
+ selector.selectOneTaskResources(candidate);
+ if (!crossCompactionTaskResource.isValid()) {
+ throw new RuntimeException("compaction task resource is not
valid");
+ }
+ if (crossCompactionTaskResource.getSeqFiles().size() != 1) {
+ throw new RuntimeException("selected seq file num is not 1");
+ }
+ if (crossCompactionTaskResource.getUnseqFiles().size() != 1) {
+ throw new RuntimeException("selected unseq file num is not
1");
+ }
+
+ CrossSpaceCompactionTask crossSpaceCompactionTask =
+ new CrossSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ crossCompactionTaskResource.getSeqFiles(),
+ crossCompactionTaskResource.getUnseqFiles(),
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getCrossCompactionPerformer()
+ .createInstance(),
+ CompactionTaskManager.currentTaskNum,
+ crossCompactionTaskResource.getTotalMemoryCost(),
+ tsFileManager.getNextCompactionTaskId());
+ // set file status to COMPACTION_CANDIDATE
+ if
(!crossSpaceCompactionTask.setSourceFilesToCompactionCandidate()) {
+ throw new RuntimeException("set status should be true");
+ }
+ for (int i = 0; i < unseqResources.size(); i++) {
+ TsFileResource resource = unseqResources.get(i);
+ if (i < 1) {
+ if (resource.getStatus() !=
TsFileResourceStatus.COMPACTION_CANDIDATE) {
+ throw new RuntimeException("status should be
COMPACTION_CANDIDATE");
+ }
+ } else if (i == 1) {
+ if (resource.getStatus() != TsFileResourceStatus.DELETED) {
+ throw new RuntimeException("status should be DELETED");
+ }
+ } else if (resource.getStatus() !=
TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ for (int i = 0; i < seqResources.size(); i++) {
+ TsFileResource resource = seqResources.get(i);
+ if (i < 1) {
+ if (resource.getStatus() !=
TsFileResourceStatus.COMPACTION_CANDIDATE) {
+ throw new RuntimeException("status should be
COMPACTION_CANDIDATE");
+ }
+ } else if (resource.getStatus() !=
TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ // delete seq files
+ Thread thread2 =
+ new Thread(
+ () -> {
+ try {
+ cd1.await();
+ TsFileResource resource = unseqResources.get(1);
+ // try to delete file
+ resource.writeLock();
+ resource.remove();
+ resource.writeUnlock();
+ cd2.countDown();
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ thread1.start();
+ thread2.start();
+ thread1.join(10000);
+ thread2.join(10000);
+ if (fail.get()) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void
testUnSeqFileWithFileIndexBeenDeletedDuringSelectionAndBeforeSettingCandidate()
+ throws IOException, MetadataException, WriteProcessException,
InterruptedException {
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, true);
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, false);
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ AtomicBoolean fail = new AtomicBoolean(false);
+ CountDownLatch cd1 = new CountDownLatch(1);
+ CountDownLatch cd2 = new CountDownLatch(1);
+ unseqResources.get(0).degradeTimeIndex();
+ unseqResources.get(1).degradeTimeIndex();
+ unseqResources.get(2).degradeTimeIndex();
+ seqResources.get(1).degradeTimeIndex();
+
+ // select files in cross compaction
+ Thread thread1 =
+ new Thread(
+ () -> {
+ try {
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ // copy candidate source file list and add read lock
+ CrossSpaceCompactionCandidate candidate =
+ new CrossSpaceCompactionCandidate(
+ seqResources, unseqResources,
System.currentTimeMillis() - Long.MAX_VALUE);
+
+ CrossCompactionTaskResource crossCompactionTaskResource =
+ selector.selectOneTaskResources(candidate);
+ if (!crossCompactionTaskResource.isValid()) {
+ throw new RuntimeException("compaction task resource is not
valid");
+ }
+ if (crossCompactionTaskResource.getSeqFiles().size() != 5) {
+ throw new RuntimeException("selected seq file num is not 5");
+ }
+ if (crossCompactionTaskResource.getUnseqFiles().size() != 5) {
+ throw new RuntimeException("selected unseq file num is not
5");
+ }
+
+ // the other thread holds write lock and delete file
successfully before setting
+ // file status to COMPACTION_CANDIDATE
+ cd1.countDown();
+ cd2.await();
+
+ CrossSpaceCompactionTask crossSpaceCompactionTask =
+ new CrossSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ crossCompactionTaskResource.getSeqFiles(),
+ crossCompactionTaskResource.getUnseqFiles(),
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getCrossCompactionPerformer()
+ .createInstance(),
+ CompactionTaskManager.currentTaskNum,
+ crossCompactionTaskResource.getTotalMemoryCost(),
+ tsFileManager.getNextCompactionTaskId());
+ // set file status to COMPACTION_CANDIDATE
+ if
(crossSpaceCompactionTask.setSourceFilesToCompactionCandidate()) {
+ throw new RuntimeException("set status should be false");
+ }
+ for (int i = 0; i < unseqResources.size(); i++) {
+ TsFileResource resource = unseqResources.get(i);
+ if (i == 1) {
+ if (resource.getStatus() != TsFileResourceStatus.DELETED) {
+ throw new RuntimeException("status should be DELETED");
+ }
+ } else if (resource.getStatus() !=
TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ for (int i = 0; i < seqResources.size(); i++) {
+ TsFileResource resource = seqResources.get(i);
+ if (resource.getStatus() != TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ // delete seq files
+ Thread thread2 =
+ new Thread(
+ () -> {
+ try {
+ cd1.await();
+ TsFileResource resource = unseqResources.get(1);
+ // try to delete file
+ resource.writeLock();
+ resource.remove();
+ resource.writeUnlock();
+ cd2.countDown();
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ thread1.start();
+ thread2.start();
+ thread1.join(10000);
+ thread2.join(10000);
+ if (fail.get()) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void
testUnSeqFileWithFileIndexBeenDeletedDuringSelectionAndBeforeSettingCompacting()
+ throws IOException, MetadataException, WriteProcessException,
InterruptedException {
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, true);
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, false);
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ AtomicBoolean fail = new AtomicBoolean(false);
+ CountDownLatch cd1 = new CountDownLatch(1);
+ CountDownLatch cd2 = new CountDownLatch(1);
+ unseqResources.get(0).degradeTimeIndex();
+ unseqResources.get(1).degradeTimeIndex();
+ unseqResources.get(2).degradeTimeIndex();
+ seqResources.get(1).degradeTimeIndex();
+
+ // select files in cross compaction
+ Thread thread1 =
+ new Thread(
+ () -> {
+ try {
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ // copy candidate source file list and add read lock
+ CrossSpaceCompactionCandidate candidate =
+ new CrossSpaceCompactionCandidate(
+ seqResources, unseqResources,
System.currentTimeMillis() - Long.MAX_VALUE);
+
+ CrossCompactionTaskResource crossCompactionTaskResource =
+ selector.selectOneTaskResources(candidate);
+ if (!crossCompactionTaskResource.isValid()) {
+ throw new RuntimeException("compaction task resource is not
valid");
+ }
+ if (crossCompactionTaskResource.getSeqFiles().size() != 5) {
+ throw new RuntimeException("selected seq file num is not 5");
+ }
+ if (crossCompactionTaskResource.getUnseqFiles().size() != 5) {
+ throw new RuntimeException("selected unseq file num is not
5");
+ }
+
+ // the other thread holds write lock and delete file
successfully before setting
+ // file status to COMPACTION_CANDIDATE
+
+ CrossSpaceCompactionTask crossSpaceCompactionTask =
+ new CrossSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ crossCompactionTaskResource.getSeqFiles(),
+ crossCompactionTaskResource.getUnseqFiles(),
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getCrossCompactionPerformer()
+ .createInstance(),
+ CompactionTaskManager.currentTaskNum,
+ crossCompactionTaskResource.getTotalMemoryCost(),
+ tsFileManager.getNextCompactionTaskId());
+ // set file status to COMPACTION_CANDIDATE and add into queue
+ if
(!crossSpaceCompactionTask.setSourceFilesToCompactionCandidate()) {
+ throw new RuntimeException("fail to set status to compaction
candidate.");
+ }
+
+ // the other thread delete the file successfully when the
compaction task is blocked
+ // in the queue
+ cd1.countDown();
+ cd2.await();
+
+ if (crossSpaceCompactionTask.checkValidAndSetMerging()) {
+ throw new RuntimeException("cross space compaction task
should be invalid.");
+ }
+
+ for (int i = 0; i < unseqResources.size(); i++) {
+ TsFileResource resource = unseqResources.get(i);
+ if (i == 1) {
+ if (resource.getStatus() != TsFileResourceStatus.DELETED) {
+ throw new RuntimeException("status should be DELETED");
+ }
+ } else if (resource.getStatus() !=
TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ for (int i = 0; i < seqResources.size(); i++) {
+ TsFileResource resource = seqResources.get(i);
+ if (resource.getStatus() != TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ // delete seq files
+ Thread thread2 =
+ new Thread(
+ () -> {
+ try {
+ cd1.await();
+ TsFileResource resource = unseqResources.get(1);
+ // try to delete file
+ resource.writeLock();
+ resource.remove();
+ resource.writeUnlock();
+ cd2.countDown();
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ thread1.start();
+ thread2.start();
+ thread1.join(10000);
+ thread2.join(10000);
+ if (fail.get()) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testUnSeqFileWithFileIndexBeenDeletedBeforeSelection()
+ throws IOException, MetadataException, WriteProcessException,
InterruptedException {
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, true);
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, false);
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ AtomicBoolean fail = new AtomicBoolean(false);
+ CountDownLatch cd1 = new CountDownLatch(1);
+ CountDownLatch cd2 = new CountDownLatch(1);
+ unseqResources.get(0).degradeTimeIndex();
+ unseqResources.get(1).degradeTimeIndex();
+ unseqResources.get(2).degradeTimeIndex();
+ seqResources.get(1).degradeTimeIndex();
+ // select files in cross compaction
+ Thread thread1 =
+ new Thread(
+ () -> {
+ try {
+ // the file is deleted before selection
+ cd1.countDown();
+ cd2.await();
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ CrossSpaceCompactionCandidate candidate =
+ new CrossSpaceCompactionCandidate(
+ seqResources, unseqResources,
System.currentTimeMillis() - Long.MAX_VALUE);
+
+ CrossCompactionTaskResource crossCompactionTaskResource =
+ selector.selectOneTaskResources(candidate);
+ if (!crossCompactionTaskResource.isValid()) {
+ throw new RuntimeException("compaction task resource is not
valid");
+ }
+ if (crossCompactionTaskResource.getSeqFiles().size() != 1) {
+ throw new RuntimeException("selected seq file num is not 1");
+ }
+ if (crossCompactionTaskResource.getUnseqFiles().size() != 1) {
+ throw new RuntimeException("selected unseq file num is not
1");
+ }
+
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ });
+
+ // delete seq files
+ Thread thread2 =
+ new Thread(
+ () -> {
+ try {
+ cd1.await();
+ TsFileResource resource = unseqResources.get(1);
+ // try to delete file
+ resource.writeLock();
+ resource.remove();
+ resource.writeUnlock();
+ cd2.countDown();
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ });
+
+ thread1.start();
+ thread2.start();
+ thread1.join(10000);
+ thread2.join(10000);
+ if (fail.get()) {
+ Assert.fail();
+ }
+ }
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/MergeTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/MergeTest.java
index dc4759f969e..d697c05f134 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/MergeTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/MergeTest.java
@@ -176,6 +176,7 @@ abstract class MergeTest {
}
}
fileWriter.close();
+ tsFileResource.serialize();
}
void mkdirs(File file) {
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
index fcc2f8bfc67..c4cb3423dfc 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
@@ -39,7 +39,6 @@ import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -1002,6 +1001,7 @@ public class RewriteCompactionFileSelectorTest extends
MergeTest {
new Thread(
() -> {
try {
+ Thread.sleep(1000);
List<CrossCompactionTaskResource> selected =
selector.selectCrossSpaceTask(seqResources,
unseqResources);
} catch (Exception e) {
@@ -1010,12 +1010,49 @@ public class RewriteCompactionFileSelectorTest extends
MergeTest {
}
});
Thread thread2 =
+ new Thread(
+ () -> {
+ if (!seqResources.get(0).remove()) {
+ fail.set(true);
+ }
+ if (!unseqResources.get(0).remove()) {
+ fail.set(true);
+ }
+ });
+ thread1.start();
+ thread2.start();
+ thread1.join();
+ thread2.join();
+ if (fail.get()) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testDeleteAndDegradeInSelection() throws Exception {
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ AtomicBoolean fail = new AtomicBoolean(false);
+ Thread thread1 =
new Thread(
() -> {
try {
- FileUtils.delete(seqResources.get(0).getTsFile());
- FileUtils.delete(unseqResources.get(0).getTsFile());
- } catch (IOException e) {
+ Thread.sleep(1000);
+ List<CrossCompactionTaskResource> selected =
+ selector.selectCrossSpaceTask(seqResources,
unseqResources);
+ Assert.assertEquals(1, selected.get(0).getSeqFiles().size());
+ Assert.assertEquals(1, selected.get(0).getUnseqFiles().size());
+ } catch (Exception e) {
+ logger.error("Exception occurs", e);
+ fail.set(true);
+ }
+ });
+ Thread thread2 =
+ new Thread(
+ () -> {
+ seqResources.get(1).degradeTimeIndex();
+ if (!seqResources.get(1).remove()) {
+ fail.set(true);
}
});
thread1.start();
@@ -1023,7 +1060,7 @@ public class RewriteCompactionFileSelectorTest extends
MergeTest {
thread1.join();
thread2.join();
if (fail.get()) {
- // fail();
+ Assert.fail();
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionSelectorTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionSelectorTest.java
new file mode 100644
index 00000000000..307283e71ca
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionSelectorTest.java
@@ -0,0 +1,626 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.inner;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.AbstractCompactionTest;
+import
org.apache.iotdb.db.engine.compaction.execute.performer.impl.FastCompactionPerformer;
+import
org.apache.iotdb.db.engine.compaction.execute.task.InnerSpaceCompactionTask;
+import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
+import
org.apache.iotdb.db.engine.compaction.selector.impl.SizeTieredCompactionSelector;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class InnerSpaceCompactionSelectorTest extends AbstractCompactionTest {
+ @Before
+ public void setUp()
+ throws IOException, WriteProcessException, MetadataException,
InterruptedException {
+ super.setUp();
+
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(2);
+ }
+
+ @After
+ public void tearDown() throws IOException, StorageEngineException {
+ super.tearDown();
+ for (TsFileResource tsFileResource : seqResources) {
+
FileReaderManager.getInstance().closeFileAndRemoveReader(tsFileResource.getTsFilePath());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+
FileReaderManager.getInstance().closeFileAndRemoveReader(tsFileResource.getTsFilePath());
+ }
+ }
+
+ @Test
+ public void testFileWithDeviceIndexBeenDeletedBeforeSelection()
+ throws IOException, MetadataException, WriteProcessException,
InterruptedException {
+ createFiles(6, 2, 3, 50, 0, 10000, 50, 50, false, true);
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ AtomicBoolean fail = new AtomicBoolean(false);
+
+ CountDownLatch cd = new CountDownLatch(1);
+
+ Thread thread1 =
+ new Thread(
+ () -> {
+ try {
+ // the file is deleted before selection
+ cd.await();
+ SizeTieredCompactionSelector selector =
+ new SizeTieredCompactionSelector("", "", 0, true,
tsFileManager);
+ List<TsFileResource> resources =
+ tsFileManager.getOrCreateSequenceListByTimePartition(0);
+ List<List<TsFileResource>> taskResource =
selector.selectInnerSpaceTask(resources);
+ if (taskResource.size() != 2) {
+ throw new RuntimeException("task num is not 2");
+ }
+ if (taskResource.get(0).size() != 2 ||
taskResource.get(1).size() != 2) {
+ throw new RuntimeException("selected file num is not 2");
+ }
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ });
+
+ // delete seq files
+ Thread thread2 =
+ new Thread(
+ () -> {
+ try {
+ TsFileResource resource = seqResources.get(1);
+ // try to delete file
+ resource.writeLock();
+ resource.remove();
+ resource.writeUnlock();
+ cd.countDown();
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ });
+
+ thread1.start();
+ thread2.start();
+ thread1.join(10000);
+ thread2.join(10000);
+ if (fail.get()) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void
testFileWithDeviceIndexBeenDeletedDuringSelectionAndBeforeSettingCandidate()
+ throws IOException, MetadataException, WriteProcessException,
InterruptedException {
+ createFiles(6, 2, 3, 50, 0, 10000, 50, 50, false, true);
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ AtomicBoolean fail = new AtomicBoolean(false);
+
+ CountDownLatch cd1 = new CountDownLatch(1);
+ CountDownLatch cd2 = new CountDownLatch(1);
+
+ // select files in cross compaction
+ Thread thread1 =
+ new Thread(
+ () -> {
+ try {
+ SizeTieredCompactionSelector selector =
+ new SizeTieredCompactionSelector("", "", 0, true,
tsFileManager);
+ // copy candidate source file list
+ List<TsFileResource> resources =
+ tsFileManager.getOrCreateSequenceListByTimePartition(0);
+ List<List<TsFileResource>> taskResource =
selector.selectInnerSpaceTask(resources);
+
+ // the other thread holds write lock and delete files
successfully before setting
+ // status to COMPACTION_CANDIDATE
+ cd1.countDown();
+ cd2.await();
+
+ if (taskResource.size() != 3) {
+ throw new RuntimeException("task num is not 3");
+ }
+ for (int idx = 0; idx < taskResource.size(); idx++) {
+ List<TsFileResource> task = taskResource.get(idx);
+ if (task.size() != 2) {
+ throw new RuntimeException("selected file num is not 2");
+ }
+ InnerSpaceCompactionTask innerSpaceCompactionTask =
+ new InnerSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ task,
+ true,
+ new FastCompactionPerformer(false),
+ CompactionTaskManager.currentTaskNum,
+ tsFileManager.getNextCompactionTaskId());
+ // set file status to COMPACTION_CANDIDATE
+ if (idx == 0) {
+ if
(innerSpaceCompactionTask.setSourceFilesToCompactionCandidate()) {
+ throw new RuntimeException("set status should be false");
+ }
+ for (int i = 0; i < task.size(); i++) {
+ TsFileResource resource = task.get(i);
+ if (i == 1) {
+ if (resource.getStatus() !=
TsFileResourceStatus.DELETED) {
+ throw new RuntimeException("status should be
DELETED");
+ }
+ } else if (resource.getStatus() !=
TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ } else {
+ if
(!innerSpaceCompactionTask.setSourceFilesToCompactionCandidate()) {
+ throw new RuntimeException("set status should be true");
+ }
+ for (int i = 0; i < task.size(); i++) {
+ TsFileResource resource = task.get(i);
+ if (resource.getStatus() !=
TsFileResourceStatus.COMPACTION_CANDIDATE) {
+ throw new RuntimeException("status should be
COMPACTION_CANDIDATE");
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ // delete seq files
+ Thread thread2 =
+ new Thread(
+ () -> {
+ try {
+ cd1.await();
+ TsFileResource resource = seqResources.get(1);
+ // try to delete file
+ resource.writeLock();
+ resource.remove();
+ resource.writeUnlock();
+ cd2.countDown();
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ thread1.start();
+ thread2.start();
+ thread1.join(10000);
+ thread2.join(10000);
+ if (fail.get()) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void
testFileWithDeviceIndexBeenDeletedDuringSelectionAndBeforeSettingCompacting()
+ throws IOException, MetadataException, WriteProcessException,
InterruptedException {
+ createFiles(6, 2, 3, 50, 0, 10000, 50, 50, false, true);
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ AtomicBoolean fail = new AtomicBoolean(false);
+
+ CountDownLatch cd1 = new CountDownLatch(1);
+ CountDownLatch cd2 = new CountDownLatch(1);
+
+ // select files in cross compaction
+ Thread thread1 =
+ new Thread(
+ () -> {
+ try {
+ SizeTieredCompactionSelector selector =
+ new SizeTieredCompactionSelector("", "", 0, true,
tsFileManager);
+ // copy candidate source file list
+ List<TsFileResource> resources =
+ tsFileManager.getOrCreateSequenceListByTimePartition(0);
+ List<List<TsFileResource>> taskResource =
selector.selectInnerSpaceTask(resources);
+
+ if (taskResource.size() != 3) {
+ throw new RuntimeException("task num is not 3");
+ }
+ for (int idx = 0; idx < taskResource.size(); idx++) {
+ List<TsFileResource> task = taskResource.get(idx);
+ if (task.size() != 2) {
+ throw new RuntimeException("selected file num is not 2");
+ }
+ InnerSpaceCompactionTask innerSpaceCompactionTask =
+ new InnerSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ task,
+ true,
+ new FastCompactionPerformer(false),
+ CompactionTaskManager.currentTaskNum,
+ tsFileManager.getNextCompactionTaskId());
+
+ if
(!innerSpaceCompactionTask.setSourceFilesToCompactionCandidate()) {
+ throw new RuntimeException("set status should be true");
+ }
+
+ // set file status to COMPACTION_CANDIDATE
+ if (idx == 0) {
+ // the other thread holds write lock and delete files
successfully before
+ // setting status to COMPACTING
+ cd1.countDown();
+ cd2.await();
+
+ if (innerSpaceCompactionTask.checkValidAndSetMerging()) {
+ throw new RuntimeException("cross space compaction task
should be invalid.");
+ }
+ for (int i = 0; i < task.size(); i++) {
+ TsFileResource resource = task.get(i);
+ if (i == 1) {
+ if (resource.getStatus() !=
TsFileResourceStatus.DELETED) {
+ throw new RuntimeException("status should be
DELETED");
+ }
+ } else if (resource.getStatus() !=
TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ } else {
+ if (!innerSpaceCompactionTask.checkValidAndSetMerging()) {
+ throw new RuntimeException("cross space compaction task
should be valid.");
+ }
+ for (int i = 0; i < task.size(); i++) {
+ TsFileResource resource = task.get(i);
+ if (resource.getStatus() !=
TsFileResourceStatus.COMPACTING) {
+ throw new RuntimeException("status should be
COMPACTING");
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ // delete seq files
+ Thread thread2 =
+ new Thread(
+ () -> {
+ try {
+ cd1.await();
+ TsFileResource resource = seqResources.get(1);
+ // try to delete file
+ resource.writeLock();
+ resource.remove();
+ resource.writeUnlock();
+ cd2.countDown();
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ thread1.start();
+ thread2.start();
+ thread1.join(10000);
+ thread2.join(10000);
+ if (fail.get()) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testFileWithFileIndexBeenDeletedBeforeSelection()
+ throws IOException, MetadataException, WriteProcessException,
InterruptedException {
+ createFiles(6, 2, 3, 50, 0, 10000, 50, 50, false, true);
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ AtomicBoolean fail = new AtomicBoolean(false);
+ seqResources.get(0).degradeTimeIndex();
+ seqResources.get(1).degradeTimeIndex();
+ seqResources.get(3).degradeTimeIndex();
+
+ CountDownLatch cd1 = new CountDownLatch(1);
+ CountDownLatch cd2 = new CountDownLatch(1);
+
+ Thread thread1 =
+ new Thread(
+ () -> {
+ try {
+ // the file is deleted before selection
+ cd1.countDown();
+ cd2.await();
+ SizeTieredCompactionSelector selector =
+ new SizeTieredCompactionSelector("", "", 0, true,
tsFileManager);
+ List<TsFileResource> resources =
+ tsFileManager.getOrCreateSequenceListByTimePartition(0);
+ List<List<TsFileResource>> taskResource =
selector.selectInnerSpaceTask(resources);
+ if (taskResource.size() != 2) {
+ throw new RuntimeException("task num is not 2");
+ }
+ if (taskResource.get(0).size() != 2 ||
taskResource.get(1).size() != 2) {
+ throw new RuntimeException("selected file num is not 2");
+ }
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ });
+
+ // delete seq files
+ Thread thread2 =
+ new Thread(
+ () -> {
+ try {
+ cd1.await();
+ TsFileResource resource = seqResources.get(1);
+ // try to delete file
+ resource.writeLock();
+ resource.remove();
+ resource.writeUnlock();
+ cd2.countDown();
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ });
+
+ thread1.start();
+ thread2.start();
+ thread1.join(10000);
+ thread2.join(10000);
+ if (fail.get()) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void
testFileWithFileIndexBeenDeletedDuringSelectionAndBeforeSettingCandidate()
+ throws IOException, MetadataException, WriteProcessException,
InterruptedException {
+ createFiles(6, 2, 3, 50, 0, 10000, 50, 50, false, true);
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ AtomicBoolean fail = new AtomicBoolean(false);
+ seqResources.get(0).degradeTimeIndex();
+ seqResources.get(1).degradeTimeIndex();
+ seqResources.get(3).degradeTimeIndex();
+
+ CountDownLatch cd1 = new CountDownLatch(1);
+ CountDownLatch cd2 = new CountDownLatch(1);
+
+ // select files in cross compaction
+ Thread thread1 =
+ new Thread(
+ () -> {
+ try {
+ SizeTieredCompactionSelector selector =
+ new SizeTieredCompactionSelector("", "", 0, true,
tsFileManager);
+ // copy candidate source file list
+ List<TsFileResource> resources =
+ tsFileManager.getOrCreateSequenceListByTimePartition(0);
+ List<List<TsFileResource>> taskResource =
selector.selectInnerSpaceTask(resources);
+
+ // the other thread holds write lock and delete files
successfully before setting
+ // status to COMPACTION_CANDIDATE
+ cd1.countDown();
+ cd2.await();
+
+ if (taskResource.size() != 3) {
+ throw new RuntimeException("task num is not 3");
+ }
+ for (int idx = 0; idx < taskResource.size(); idx++) {
+ List<TsFileResource> task = taskResource.get(idx);
+ if (task.size() != 2) {
+ throw new RuntimeException("selected file num is not 2");
+ }
+ InnerSpaceCompactionTask innerSpaceCompactionTask =
+ new InnerSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ task,
+ true,
+ new FastCompactionPerformer(false),
+ CompactionTaskManager.currentTaskNum,
+ tsFileManager.getNextCompactionTaskId());
+ // set file status to COMPACTION_CANDIDATE
+ if (idx == 0) {
+ if
(innerSpaceCompactionTask.setSourceFilesToCompactionCandidate()) {
+ throw new RuntimeException("set status should be false");
+ }
+ for (int i = 0; i < task.size(); i++) {
+ TsFileResource resource = task.get(i);
+ if (i == 1) {
+ if (resource.getStatus() !=
TsFileResourceStatus.DELETED) {
+ throw new RuntimeException("status should be
DELETED");
+ }
+ } else if (resource.getStatus() !=
TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ } else {
+ if
(!innerSpaceCompactionTask.setSourceFilesToCompactionCandidate()) {
+ throw new RuntimeException("set status should be true");
+ }
+ for (int i = 0; i < task.size(); i++) {
+ TsFileResource resource = task.get(i);
+ if (resource.getStatus() !=
TsFileResourceStatus.COMPACTION_CANDIDATE) {
+ throw new RuntimeException("status should be
COMPACTION_CANDIDATE");
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ // delete seq files
+ Thread thread2 =
+ new Thread(
+ () -> {
+ try {
+ cd1.await();
+ TsFileResource resource = seqResources.get(1);
+ // try to delete file
+ resource.writeLock();
+ resource.remove();
+ resource.writeUnlock();
+ cd2.countDown();
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ thread1.start();
+ thread2.start();
+ thread1.join(10000);
+ thread2.join(10000);
+ if (fail.get()) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void
testFileWithFileIndexBeenDeletedDuringSelectionAndBeforeSettingCompacting()
+ throws IOException, MetadataException, WriteProcessException,
InterruptedException {
+ createFiles(6, 2, 3, 50, 0, 10000, 50, 50, false, true);
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ AtomicBoolean fail = new AtomicBoolean(false);
+ seqResources.get(0).degradeTimeIndex();
+ seqResources.get(1).degradeTimeIndex();
+ seqResources.get(3).degradeTimeIndex();
+
+ CountDownLatch cd1 = new CountDownLatch(1);
+ CountDownLatch cd2 = new CountDownLatch(1);
+
+ // select files in cross compaction
+ Thread thread1 =
+ new Thread(
+ () -> {
+ try {
+ SizeTieredCompactionSelector selector =
+ new SizeTieredCompactionSelector("", "", 0, true,
tsFileManager);
+ // copy candidate source file list
+ List<TsFileResource> resources =
+ tsFileManager.getOrCreateSequenceListByTimePartition(0);
+ List<List<TsFileResource>> taskResource =
selector.selectInnerSpaceTask(resources);
+
+ if (taskResource.size() != 3) {
+ throw new RuntimeException("task num is not 3");
+ }
+ for (int idx = 0; idx < taskResource.size(); idx++) {
+ List<TsFileResource> task = taskResource.get(idx);
+ if (task.size() != 2) {
+ throw new RuntimeException("selected file num is not 2");
+ }
+ InnerSpaceCompactionTask innerSpaceCompactionTask =
+ new InnerSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ task,
+ true,
+ new FastCompactionPerformer(false),
+ CompactionTaskManager.currentTaskNum,
+ tsFileManager.getNextCompactionTaskId());
+
+ if
(!innerSpaceCompactionTask.setSourceFilesToCompactionCandidate()) {
+ throw new RuntimeException("set status should be true");
+ }
+
+ // set file status to COMPACTION_CANDIDATE
+ if (idx == 0) {
+ // the other thread holds write lock and delete files
successfully before
+ // setting status to COMPACTING
+ cd1.countDown();
+ cd2.await();
+
+ if (innerSpaceCompactionTask.checkValidAndSetMerging()) {
+ throw new RuntimeException("cross space compaction task
should be invalid.");
+ }
+ for (int i = 0; i < task.size(); i++) {
+ TsFileResource resource = task.get(i);
+ if (i == 1) {
+ if (resource.getStatus() !=
TsFileResourceStatus.DELETED) {
+ throw new RuntimeException("status should be
DELETED");
+ }
+ } else if (resource.getStatus() !=
TsFileResourceStatus.NORMAL) {
+ throw new RuntimeException("status should be NORMAL");
+ }
+ }
+ } else {
+ if (!innerSpaceCompactionTask.checkValidAndSetMerging()) {
+ throw new RuntimeException("cross space compaction task
should be valid.");
+ }
+ for (int i = 0; i < task.size(); i++) {
+ TsFileResource resource = task.get(i);
+ if (resource.getStatus() !=
TsFileResourceStatus.COMPACTING) {
+ throw new RuntimeException("status should be
COMPACTING");
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ // delete seq files
+ Thread thread2 =
+ new Thread(
+ () -> {
+ try {
+ cd1.await();
+ TsFileResource resource = seqResources.get(1);
+ // try to delete file
+ resource.writeLock();
+ resource.remove();
+ resource.writeUnlock();
+ cd2.countDown();
+ } catch (Exception e) {
+ fail.set(true);
+ e.printStackTrace();
+ }
+ });
+
+ thread1.start();
+ thread2.start();
+ thread1.join(10000);
+ thread2.join(10000);
+ if (fail.get()) {
+ Assert.fail();
+ }
+ }
+}