This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 31f3458dde [IOTDB-4606]Fix getting wrong sensors and schemas in
compaction (#7566)
31f3458dde is described below
commit 31f3458dde61bcc7a04270812c48e61ffd0c5f16
Author: 周沛辰 <[email protected]>
AuthorDate: Wed Oct 12 08:58:26 2022 +0800
[IOTDB-4606]Fix getting wrong sensors and schemas in compaction (#7566)
---
.../inner/utils/MultiTsFileDeviceIterator.java | 5 +-
.../impl/ReadPointCompactionPerformer.java | 4 +-
.../ReadPointCompactionPerformerTest.java | 266 +++++++++++++++++++++
3 files changed, 271 insertions(+), 4 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
index 05983c69e0..365fe4e916 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
@@ -142,7 +142,10 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
Map<String, MeasurementSchema> schemaMap = new ConcurrentHashMap<>();
// get schemas from the newest file to the oldest file
for (TsFileResource resource : tsFileResources) {
- if (!deviceIteratorMap.containsKey(resource)) {
+ if (!deviceIteratorMap.containsKey(resource)
+ || !deviceIteratorMap.get(resource).current().equals(currentDevice))
{
+ // if this tsfile has no more device or next device is not equals to
the current device,
+ // which means this tsfile does not contain the current device, then
skip it.
continue;
}
TsFileSequenceReader reader = readerMap.get(resource);
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
index c902011beb..5e559da93e 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
@@ -188,9 +188,7 @@ public class ReadPointCompactionPerformer
AbstractCompactionWriter compactionWriter,
FragmentInstanceContext fragmentInstanceContext,
QueryDataSource queryDataSource)
- throws IOException, InterruptedException, IllegalPathException,
ExecutionException {
- MultiTsFileDeviceIterator.MeasurementIterator measurementIterator =
- deviceIterator.iterateNotAlignedSeries(device, false);
+ throws IOException, InterruptedException, ExecutionException {
Map<String, MeasurementSchema> schemaMap =
deviceIterator.getAllSchemasOfCurrentDevice();
List<String> allMeasurements = new ArrayList<>(schemaMap.keySet());
allMeasurements.sort((String::compareTo));
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
index 5a3892f451..1d718647d9 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
@@ -4907,6 +4907,272 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
}
}
+ /** Different source files have different devices and measurements with
different schemas. */
+ @Test
+ public void testCrossSpaceCompactionWithDifferentDevicesAndMeasurements()
throws Exception {
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
+ registerTimeseriesInMManger(4, 5, false);
+ createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
+ createFiles(2, 4, 5, 300, 700, 700, 50, 50, false, true);
+ createFiles(3, 3, 4, 200, 20, 10020, 30, 30, false, false);
+ createFiles(2, 1, 5, 100, 450, 20450, 0, 0, false, false);
+
+ // generate mods file
+ List<String> seriesPaths = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0" +
PATH_SEPARATOR + "s" + i);
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1" +
PATH_SEPARATOR + "s" + i);
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3" +
PATH_SEPARATOR + "s" + i);
+ }
+ generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE,
Long.MAX_VALUE);
+ generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE,
Long.MAX_VALUE);
+ deleteTimeseriesInMManager(seriesPaths);
+ setDataType(TSDataType.TEXT);
+ registerTimeseriesInMManger(2, 7, false);
+ List<Integer> deviceIndex = new ArrayList<>();
+ deviceIndex.add(1);
+ deviceIndex.add(3);
+ List<Integer> measurementIndex = new ArrayList<>();
+ for (int i = 0; i < 7; i++) {
+ measurementIndex.add(i);
+ }
+
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 1450, 0,
false, true);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 1350, 0,
false, false);
+
+ List<TsFileResource> targetResources =
+
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+ ICompactionPerformer performer =
+ new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setSummary(new CompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+ targetResources.removeIf(resource -> resource == null);
+ Assert.assertEquals(3, targetResources.size());
+
+ List<String> deviceIdList = new ArrayList<>();
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+ for (int i = 0; i < 3; i++) {
+ if (i < 2) {
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d0"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d1"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d2"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d3"));
+ } else {
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d0"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d1"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d2"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d3"));
+ }
+ check(targetResources.get(i), deviceIdList);
+ }
+
+ Map<String, Long> measurementMaxTime = new HashMap<>();
+
+ for (int i = 0; i < 4; i++) {
+ TSDataType tsDataType = i < 2 ? TSDataType.TEXT : TSDataType.INT64;
+ for (int j = 0; j < 7; j++) {
+ measurementMaxTime.putIfAbsent(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR +
"s" + j,
+ Long.MIN_VALUE);
+ PartialPath path =
+ new MeasurementPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
+ "s" + j,
+ new MeasurementSchema("s" + j, tsDataType));
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
+ path,
+ tsDataType,
+
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
+ targetResources,
+ new ArrayList<>(),
+ true);
+ int count = 0;
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ if (measurementMaxTime.get(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i +
PATH_SEPARATOR + "s" + j)
+ >= iterator.currentTime()) {
+ Assert.fail();
+ }
+ measurementMaxTime.put(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR
+ "s" + j,
+ iterator.currentTime());
+ count++;
+ iterator.next();
+ }
+ }
+ tsBlockReader.close();
+ if (i == 1 || i == 3) {
+ assertEquals(400, count);
+ } else if (i == 2) {
+ if (j < 4) {
+ assertEquals(1200, count);
+ } else if (j < 5) {
+ assertEquals(600, count);
+ } else {
+ assertEquals(0, count);
+ }
+ } else {
+ assertEquals(0, count);
+ }
+ }
+ }
+ }
+
+ /**
+ * Different source files have different aligned devices and measurements
with different schemas.
+ */
+ @Test
+ public void
testAlignedCrossSpaceCompactionWithDifferentDevicesAndMeasurements()
+ throws Exception {
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
+ registerTimeseriesInMManger(4, 5, true);
+ createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true);
+ createFiles(2, 4, 5, 300, 700, 700, 50, 50, true, true);
+ createFiles(3, 3, 4, 200, 20, 10020, 30, 30, true, false);
+ createFiles(2, 1, 5, 100, 450, 20450, 0, 0, true, false);
+
+ // generate mods file
+ List<String> seriesPaths = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10000" +
PATH_SEPARATOR + "s" + i);
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10001" +
PATH_SEPARATOR + "s" + i);
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10003" +
PATH_SEPARATOR + "s" + i);
+ }
+ generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE,
Long.MAX_VALUE);
+ generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE,
Long.MAX_VALUE);
+ deleteTimeseriesInMManager(seriesPaths);
+ setDataType(TSDataType.TEXT);
+ registerTimeseriesInMManger(2, 7, true);
+ List<Integer> deviceIndex = new ArrayList<>();
+ deviceIndex.add(1);
+ deviceIndex.add(3);
+ List<Integer> measurementIndex = new ArrayList<>();
+ for (int i = 0; i < 7; i++) {
+ measurementIndex.add(i);
+ }
+
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 1450, 0,
true, true);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 1350, 0,
true, false);
+
+ List<TsFileResource> targetResources =
+
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+ ICompactionPerformer performer =
+ new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setSummary(new CompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+ targetResources.removeIf(resource -> resource == null);
+ Assert.assertEquals(3, targetResources.size());
+
+ List<String> deviceIdList = new ArrayList<>();
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10000");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10001");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10002");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10003");
+ for (int i = 0; i < 3; i++) {
+ if (i < 2) {
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d10000"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d10001"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d10002"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d10003"));
+ } else {
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d10000"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d10001"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d10002"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG +
PATH_SEPARATOR + "d10003"));
+ }
+ check(targetResources.get(i), deviceIdList);
+ }
+
+ Map<String, Long> measurementMaxTime = new HashMap<>();
+
+ for (int i = 0; i < 4; i++) {
+ TSDataType tsDataType = i < 2 ? TSDataType.TEXT : TSDataType.INT64;
+ for (int j = 0; j < 7; j++) {
+ measurementMaxTime.putIfAbsent(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d1000" + i + PATH_SEPARATOR
+ "s" + j,
+ Long.MIN_VALUE);
+ List<IMeasurementSchema> schemas = new ArrayList<>();
+ TSDataType dataType = i == 1 || i == 3 ? TSDataType.TEXT :
TSDataType.INT64;
+ schemas.add(new MeasurementSchema("s" + j, dataType));
+ AlignedPath path =
+ new AlignedPath(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + i),
+ Collections.singletonList("s" + j),
+ schemas);
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
+ path,
+ TSDataType.TEXT,
+
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
+ targetResources,
+ new ArrayList<>(),
+ true);
+ int count = 0;
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ if (measurementMaxTime.get(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d1000" + i +
PATH_SEPARATOR + "s" + j)
+ >= iterator.currentTime()) {
+ Assert.fail();
+ }
+ measurementMaxTime.put(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d1000" + i +
PATH_SEPARATOR + "s" + j,
+ iterator.currentTime());
+ count++;
+ System.out.println(iterator.currentTime());
+ iterator.next();
+ }
+ }
+ tsBlockReader.close();
+ if (i == 1 || i == 3) {
+ assertEquals(400, count);
+ } else if (i == 2) {
+ if (j < 4) {
+ assertEquals(1200, count);
+ } else if (j < 5) {
+ assertEquals(600, count);
+ } else {
+ assertEquals(0, count);
+ }
+ } else {
+ assertEquals(0, count);
+ }
+ }
+ }
+ }
+
@Test
public void testCrossSpaceCompactionWithNewDeviceInUnseqFile() throws
ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);