This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 5c39f8a76f [To rel/0.13][IOTDB-4606]Fix getting wrong sensors and
schemas in compaction (#7567)
5c39f8a76f is described below
commit 5c39f8a76fcdaa920984418795d66990d2d8ebee
Author: 周沛辰 <[email protected]>
AuthorDate: Wed Oct 12 18:53:34 2022 +0800
[To rel/0.13][IOTDB-4606]Fix getting wrong sensors and schemas in
compaction (#7567)
---
.../inner/utils/MultiTsFileDeviceIterator.java | 5 +-
.../db/engine/compaction/CompactionUtilsTest.java | 271 +++++++++++++++++++++
2 files changed, 275 insertions(+), 1 deletion(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
index 6e2223d3c5..4e51465603 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
@@ -143,7 +143,10 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
Map<String, MeasurementSchema> schemaMap = new ConcurrentHashMap<>();
// get schemas from the newest file to the oldest file
for (TsFileResource resource : tsFileResources) {
- if (!deviceIteratorMap.containsKey(resource)) {
+ if (!deviceIteratorMap.containsKey(resource)
+ || !deviceIteratorMap.get(resource).current().equals(currentDevice))
{
+ // if this tsfile has no more device or next device is not equals to
the current device,
+ // which means this tsfile does not contain the current device, then
skip it.
continue;
}
TsFileSequenceReader reader = readerMap.get(resource);
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java
index c793541bb1..fef8640e3a 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java
@@ -4795,6 +4795,277 @@ public class CompactionUtilsTest extends
AbstractCompactionTest {
}
}
+ /** Different source files have different devices and measurements with
different schemas. */
+ @Test
+ public void testCrossSpaceCompactionWithDifferentDevicesAndMeasurements()
throws Exception {
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
+ registerTimeseriesInMManger(4, 5, false);
+ createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
+ createFiles(2, 4, 5, 300, 700, 700, 50, 50, false, true);
+ createFiles(3, 3, 4, 200, 20, 10020, 30, 30, false, false);
+ createFiles(2, 1, 5, 100, 450, 20450, 0, 0, false, false);
+
+ // generate mods file
+ List<String> seriesPaths = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0" +
PATH_SEPARATOR + "s" + i);
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1" +
PATH_SEPARATOR + "s" + i);
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3" +
PATH_SEPARATOR + "s" + i);
+ }
+ generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE,
Long.MAX_VALUE);
+ generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE,
Long.MAX_VALUE);
+ deleteTimeseriesInMManager(seriesPaths);
+ setDataType(TSDataType.TEXT);
+ registerTimeseriesInMManger(2, 7, false);
+ List<Integer> deviceIndex = new ArrayList<>();
+ deviceIndex.add(1);
+ deviceIndex.add(3);
+ List<Integer> measurementIndex = new ArrayList<>();
+ for (int i = 0; i < 7; i++) {
+ measurementIndex.add(i);
+ }
+
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 1450, 0,
false, true);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 1350, 0,
false, false);
+
+ List<TsFileResource> targetResources =
+
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+ CompactionUtils.compact(seqResources, unseqResources, targetResources);
+ CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+ targetResources.removeIf(resource -> resource == null);
+ Assert.assertEquals(3, targetResources.size());
+
+ List<String> deviceIdList = new ArrayList<>();
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+ for (int i = 0; i < 3; i++) {
+ if (i < 2) {
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d0"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d1"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d2"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d3"));
+ } else {
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d0"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d1"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d2"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d3"));
+ }
+ check(targetResources.get(i), deviceIdList);
+ }
+
+ Map<String, Long> measurementMaxTime = new HashMap<>();
+
+ for (int i = 0; i < 4; i++) {
+ TSDataType tsDataType = i < 2 ? TSDataType.TEXT : TSDataType.INT64;
+ for (int j = 0; j < 7; j++) {
+ measurementMaxTime.putIfAbsent(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR +
"s" + j,
+ Long.MIN_VALUE);
+ PartialPath path =
+ new MeasurementPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
+ "s" + j,
+ new MeasurementSchema("s" + j, tsDataType));
+ IBatchReader tsFilesReader =
+ new SeriesRawDataBatchReader(
+ path,
+ TSDataType.VECTOR,
+ EnvironmentUtils.TEST_QUERY_CONTEXT,
+ targetResources,
+ new ArrayList<>(),
+ null,
+ null,
+ true);
+ int count = 0;
+ while (tsFilesReader.hasNextBatch()) {
+ BatchData batchData = tsFilesReader.nextBatch();
+ while (batchData.hasCurrent()) {
+ if (measurementMaxTime.get(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i +
PATH_SEPARATOR + "s" + j)
+ >= batchData.currentTime()) {
+ Assert.fail();
+ }
+ measurementMaxTime.put(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR
+ "s" + j,
+ batchData.currentTime());
+ count++;
+ batchData.next();
+ }
+ }
+ tsFilesReader.close();
+ if (i == 1 || i == 3) {
+ assertEquals(400, count);
+ } else if (i == 2) {
+ if (j < 4) {
+ assertEquals(1200, count);
+ } else if (j < 5) {
+ assertEquals(600, count);
+ } else {
+ assertEquals(0, count);
+ }
+ } else {
+ assertEquals(0, count);
+ }
+ }
+ }
+ }
+
+ /**
+ * Different source files have different aligned devices and measurements
with different schemas.
+ */
+ @Test
+ public void
testAlignedCrossSpaceCompactionWithDifferentDevicesAndMeasurements()
+ throws Exception {
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
+ registerTimeseriesInMManger(4, 5, true);
+ createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true);
+ createFiles(2, 4, 5, 300, 700, 700, 50, 50, true, true);
+ createFiles(3, 3, 4, 200, 20, 10020, 30, 30, true, false);
+ createFiles(2, 1, 5, 100, 450, 20450, 0, 0, true, false);
+
+ // generate mods file
+ List<String> seriesPaths = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10000" +
PATH_SEPARATOR + "s" + i);
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10001" +
PATH_SEPARATOR + "s" + i);
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10003" +
PATH_SEPARATOR + "s" + i);
+ }
+ generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE,
Long.MAX_VALUE);
+ generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE,
Long.MAX_VALUE);
+ deleteTimeseriesInMManager(seriesPaths);
+ setDataType(TSDataType.TEXT);
+ registerTimeseriesInMManger(2, 7, true);
+ List<Integer> deviceIndex = new ArrayList<>();
+ deviceIndex.add(1);
+ deviceIndex.add(3);
+ List<Integer> measurementIndex = new ArrayList<>();
+ for (int i = 0; i < 7; i++) {
+ measurementIndex.add(i);
+ }
+
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 1450, 0,
true, true);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 1350, 0,
true, false);
+
+ List<TsFileResource> targetResources =
+
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+ CompactionUtils.compact(seqResources, unseqResources, targetResources);
+ CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+ targetResources.removeIf(resource -> resource == null);
+ Assert.assertEquals(3, targetResources.size());
+
+ List<String> deviceIdList = new ArrayList<>();
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10000");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10001");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10002");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10003");
+ for (int i = 0; i < 3; i++) {
+ if (i < 2) {
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d10000"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d10001"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d10002"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d10003"));
+ } else {
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d10000"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d10001"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d10002"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d10003"));
+ }
+ check(targetResources.get(i), deviceIdList);
+ }
+
+ Map<String, Long> measurementMaxTime = new HashMap<>();
+
+ for (int i = 0; i < 4; i++) {
+ TSDataType tsDataType = i < 2 ? TSDataType.TEXT : TSDataType.INT64;
+ for (int j = 0; j < 7; j++) {
+ measurementMaxTime.putIfAbsent(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d1000" + i + PATH_SEPARATOR
+ "s" + j,
+ Long.MIN_VALUE);
+ List<IMeasurementSchema> schemas = new ArrayList<>();
+ TSDataType dataType = i == 1 || i == 3 ? TSDataType.TEXT :
TSDataType.INT64;
+ schemas.add(new MeasurementSchema("s" + j, dataType));
+ AlignedPath path =
+ new AlignedPath(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + i),
+ Collections.singletonList("s" + j),
+ schemas);
+ IBatchReader tsFilesReader =
+ new SeriesRawDataBatchReader(
+ path,
+ TSDataType.VECTOR,
+ EnvironmentUtils.TEST_QUERY_CONTEXT,
+ targetResources,
+ new ArrayList<>(),
+ null,
+ null,
+ true);
+ int count = 0;
+ while (tsFilesReader.hasNextBatch()) {
+ BatchData batchData = tsFilesReader.nextBatch();
+ while (batchData.hasCurrent()) {
+ if (measurementMaxTime.get(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+ + PATH_SEPARATOR
+ + "s"
+ + j)
+ >= batchData.currentTime()) {
+ Assert.fail();
+ }
+ measurementMaxTime.put(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+ + PATH_SEPARATOR
+ + "s"
+ + j,
+ batchData.currentTime());
+ count++;
+ batchData.next();
+ }
+ }
+ tsFilesReader.close();
+ if (i == 1 || i == 3) {
+ assertEquals(400, count);
+ } else if (i == 2) {
+ if (j < 4) {
+ assertEquals(1200, count);
+ } else if (j < 5) {
+ assertEquals(600, count);
+ } else {
+ assertEquals(0, count);
+ }
+ } else {
+ assertEquals(0, count);
+ }
+ }
+ }
+ }
+
@Test
public void testCrossSpaceCompactionWithNewDeviceInUnseqFile() {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);