This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 31f3458dde [IOTDB-4606]Fix getting wrong sensors and schemas in 
compaction (#7566)
31f3458dde is described below

commit 31f3458dde61bcc7a04270812c48e61ffd0c5f16
Author: 周沛辰 <[email protected]>
AuthorDate: Wed Oct 12 08:58:26 2022 +0800

    [IOTDB-4606]Fix getting wrong sensors and schemas in compaction (#7566)
---
 .../inner/utils/MultiTsFileDeviceIterator.java     |   5 +-
 .../impl/ReadPointCompactionPerformer.java         |   4 +-
 .../ReadPointCompactionPerformerTest.java          | 266 +++++++++++++++++++++
 3 files changed, 271 insertions(+), 4 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
index 05983c69e0..365fe4e916 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
@@ -142,7 +142,10 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
     Map<String, MeasurementSchema> schemaMap = new ConcurrentHashMap<>();
     // get schemas from the newest file to the oldest file
     for (TsFileResource resource : tsFileResources) {
-      if (!deviceIteratorMap.containsKey(resource)) {
+      if (!deviceIteratorMap.containsKey(resource)
+          || !deviceIteratorMap.get(resource).current().equals(currentDevice)) 
{
+        // if this tsfile has no more device or next device is not equals to 
the current device,
+        // which means this tsfile does not contain the current device, then 
skip it.
         continue;
       }
       TsFileSequenceReader reader = readerMap.get(resource);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
index c902011beb..5e559da93e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
@@ -188,9 +188,7 @@ public class ReadPointCompactionPerformer
       AbstractCompactionWriter compactionWriter,
       FragmentInstanceContext fragmentInstanceContext,
       QueryDataSource queryDataSource)
-      throws IOException, InterruptedException, IllegalPathException, 
ExecutionException {
-    MultiTsFileDeviceIterator.MeasurementIterator measurementIterator =
-        deviceIterator.iterateNotAlignedSeries(device, false);
+      throws IOException, InterruptedException, ExecutionException {
     Map<String, MeasurementSchema> schemaMap = 
deviceIterator.getAllSchemasOfCurrentDevice();
     List<String> allMeasurements = new ArrayList<>(schemaMap.keySet());
     allMeasurements.sort((String::compareTo));
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
index 5a3892f451..1d718647d9 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
@@ -4907,6 +4907,272 @@ public class ReadPointCompactionPerformerTest extends 
AbstractCompactionTest {
     }
   }
 
+  /** Different source files have different devices and measurements with 
different schemas. */
+  @Test
+  public void testCrossSpaceCompactionWithDifferentDevicesAndMeasurements() 
throws Exception {
+    TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
+    registerTimeseriesInMManger(4, 5, false);
+    createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
+    createFiles(2, 4, 5, 300, 700, 700, 50, 50, false, true);
+    createFiles(3, 3, 4, 200, 20, 10020, 30, 30, false, false);
+    createFiles(2, 1, 5, 100, 450, 20450, 0, 0, false, false);
+
+    // generate mods file
+    List<String> seriesPaths = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0" + 
PATH_SEPARATOR + "s" + i);
+      seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1" + 
PATH_SEPARATOR + "s" + i);
+      seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3" + 
PATH_SEPARATOR + "s" + i);
+    }
+    generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, 
Long.MAX_VALUE);
+    generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, 
Long.MAX_VALUE);
+    deleteTimeseriesInMManager(seriesPaths);
+    setDataType(TSDataType.TEXT);
+    registerTimeseriesInMManger(2, 7, false);
+    List<Integer> deviceIndex = new ArrayList<>();
+    deviceIndex.add(1);
+    deviceIndex.add(3);
+    List<Integer> measurementIndex = new ArrayList<>();
+    for (int i = 0; i < 7; i++) {
+      measurementIndex.add(i);
+    }
+
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 1450, 0, 
false, true);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 1350, 0, 
false, false);
+
+    List<TsFileResource> targetResources =
+        
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+    ICompactionPerformer performer =
+        new ReadPointCompactionPerformer(seqResources, unseqResources, 
targetResources);
+    performer.setSummary(new CompactionTaskSummary());
+    performer.perform();
+    CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+    targetResources.removeIf(resource -> resource == null);
+    Assert.assertEquals(3, targetResources.size());
+
+    List<String> deviceIdList = new ArrayList<>();
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+    for (int i = 0; i < 3; i++) {
+      if (i < 2) {
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + 
PATH_SEPARATOR + "d0"));
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + 
PATH_SEPARATOR + "d1"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + 
PATH_SEPARATOR + "d2"));
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + 
PATH_SEPARATOR + "d3"));
+      } else {
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + 
PATH_SEPARATOR + "d0"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + 
PATH_SEPARATOR + "d1"));
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + 
PATH_SEPARATOR + "d2"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + 
PATH_SEPARATOR + "d3"));
+      }
+      check(targetResources.get(i), deviceIdList);
+    }
+
+    Map<String, Long> measurementMaxTime = new HashMap<>();
+
+    for (int i = 0; i < 4; i++) {
+      TSDataType tsDataType = i < 2 ? TSDataType.TEXT : TSDataType.INT64;
+      for (int j = 0; j < 7; j++) {
+        measurementMaxTime.putIfAbsent(
+            COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + 
"s" + j,
+            Long.MIN_VALUE);
+        PartialPath path =
+            new MeasurementPath(
+                COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
+                "s" + j,
+                new MeasurementSchema("s" + j, tsDataType));
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
+                path,
+                tsDataType,
+                
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
+                targetResources,
+                new ArrayList<>(),
+                true);
+        int count = 0;
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
+            if (measurementMaxTime.get(
+                    COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + 
PATH_SEPARATOR + "s" + j)
+                >= iterator.currentTime()) {
+              Assert.fail();
+            }
+            measurementMaxTime.put(
+                COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR 
+ "s" + j,
+                iterator.currentTime());
+            count++;
+            iterator.next();
+          }
+        }
+        tsBlockReader.close();
+        if (i == 1 || i == 3) {
+          assertEquals(400, count);
+        } else if (i == 2) {
+          if (j < 4) {
+            assertEquals(1200, count);
+          } else if (j < 5) {
+            assertEquals(600, count);
+          } else {
+            assertEquals(0, count);
+          }
+        } else {
+          assertEquals(0, count);
+        }
+      }
+    }
+  }
+
+  /**
+   * Different source files have different aligned devices and measurements 
with different schemas.
+   */
+  @Test
+  public void 
testAlignedCrossSpaceCompactionWithDifferentDevicesAndMeasurements()
+      throws Exception {
+    TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
+    registerTimeseriesInMManger(4, 5, true);
+    createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true);
+    createFiles(2, 4, 5, 300, 700, 700, 50, 50, true, true);
+    createFiles(3, 3, 4, 200, 20, 10020, 30, 30, true, false);
+    createFiles(2, 1, 5, 100, 450, 20450, 0, 0, true, false);
+
+    // generate mods file
+    List<String> seriesPaths = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10000" + 
PATH_SEPARATOR + "s" + i);
+      seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10001" + 
PATH_SEPARATOR + "s" + i);
+      seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10003" + 
PATH_SEPARATOR + "s" + i);
+    }
+    generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, 
Long.MAX_VALUE);
+    generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, 
Long.MAX_VALUE);
+    deleteTimeseriesInMManager(seriesPaths);
+    setDataType(TSDataType.TEXT);
+    registerTimeseriesInMManger(2, 7, true);
+    List<Integer> deviceIndex = new ArrayList<>();
+    deviceIndex.add(1);
+    deviceIndex.add(3);
+    List<Integer> measurementIndex = new ArrayList<>();
+    for (int i = 0; i < 7; i++) {
+      measurementIndex.add(i);
+    }
+
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 1450, 0, 
true, true);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 1350, 0, 
true, false);
+
+    List<TsFileResource> targetResources =
+        
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+    ICompactionPerformer performer =
+        new ReadPointCompactionPerformer(seqResources, unseqResources, 
targetResources);
+    performer.setSummary(new CompactionTaskSummary());
+    performer.perform();
+    CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+    targetResources.removeIf(resource -> resource == null);
+    Assert.assertEquals(3, targetResources.size());
+
+    List<String> deviceIdList = new ArrayList<>();
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10000");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10001");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10002");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10003");
+    for (int i = 0; i < 3; i++) {
+      if (i < 2) {
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + 
PATH_SEPARATOR + "d10000"));
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + 
PATH_SEPARATOR + "d10001"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + 
PATH_SEPARATOR + "d10002"));
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + 
PATH_SEPARATOR + "d10003"));
+      } else {
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + 
PATH_SEPARATOR + "d10000"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + 
PATH_SEPARATOR + "d10001"));
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + 
PATH_SEPARATOR + "d10002"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + 
PATH_SEPARATOR + "d10003"));
+      }
+      check(targetResources.get(i), deviceIdList);
+    }
+
+    Map<String, Long> measurementMaxTime = new HashMap<>();
+
+    for (int i = 0; i < 4; i++) {
+      TSDataType tsDataType = i < 2 ? TSDataType.TEXT : TSDataType.INT64;
+      for (int j = 0; j < 7; j++) {
+        measurementMaxTime.putIfAbsent(
+            COMPACTION_TEST_SG + PATH_SEPARATOR + "d1000" + i + PATH_SEPARATOR 
+ "s" + j,
+            Long.MIN_VALUE);
+        List<IMeasurementSchema> schemas = new ArrayList<>();
+        TSDataType dataType = i == 1 || i == 3 ? TSDataType.TEXT : 
TSDataType.INT64;
+        schemas.add(new MeasurementSchema("s" + j, dataType));
+        AlignedPath path =
+            new AlignedPath(
+                COMPACTION_TEST_SG
+                    + PATH_SEPARATOR
+                    + "d"
+                    + (TsFileGeneratorUtils.getAlignDeviceOffset() + i),
+                Collections.singletonList("s" + j),
+                schemas);
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
+                path,
+                TSDataType.TEXT,
+                
FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
+                targetResources,
+                new ArrayList<>(),
+                true);
+        int count = 0;
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
+            if (measurementMaxTime.get(
+                    COMPACTION_TEST_SG + PATH_SEPARATOR + "d1000" + i + 
PATH_SEPARATOR + "s" + j)
+                >= iterator.currentTime()) {
+              Assert.fail();
+            }
+            measurementMaxTime.put(
+                COMPACTION_TEST_SG + PATH_SEPARATOR + "d1000" + i + 
PATH_SEPARATOR + "s" + j,
+                iterator.currentTime());
+            count++;
+            System.out.println(iterator.currentTime());
+            iterator.next();
+          }
+        }
+        tsBlockReader.close();
+        if (i == 1 || i == 3) {
+          assertEquals(400, count);
+        } else if (i == 2) {
+          if (j < 4) {
+            assertEquals(1200, count);
+          } else if (j < 5) {
+            assertEquals(600, count);
+          } else {
+            assertEquals(0, count);
+          }
+        } else {
+          assertEquals(0, count);
+        }
+      }
+    }
+  }
+
   @Test
   public void testCrossSpaceCompactionWithNewDeviceInUnseqFile() throws 
ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);

Reply via email to