This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 897ed48ec9 [IOTDB-3247] Recover aligned sensors after deleting 
timeseries, query lost data (#6468)
897ed48ec9 is described below

commit 897ed48ec90652bf5ed3b13b424993e75381a280
Author: Chen YZ <[email protected]>
AuthorDate: Thu Jun 30 22:09:17 2022 +0800

    [IOTDB-3247] Recover aligned sensors after deleting timeseries, query lost 
data (#6468)
---
 .../iotdb/db/engine/memtable/AbstractMemTable.java |  35 ++-
 .../engine/memtable/AlignedWritableMemChunk.java   |  26 +-
 .../db/wal/recover/file/TsFilePlanRedoerTest.java  | 303 +++++++++++++++++++++
 3 files changed, 340 insertions(+), 24 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 06726f3244..2b6d39d32b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -56,7 +56,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 public abstract class AbstractMemTable implements IMemTable {
   /** each memTable node has a unique int value identifier, init when 
recovering wal */
@@ -128,7 +130,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
     IWritableMemChunkGroup memChunkGroup =
         memTableMap.computeIfAbsent(deviceId, k -> new 
WritableMemChunkGroup());
     for (IMeasurementSchema schema : schemaList) {
-      if (!memChunkGroup.contains(schema.getMeasurementId())) {
+      if (schema != null && 
!memChunkGroup.contains(schema.getMeasurementId())) {
         seriesNumber++;
         totalPointsNumThreshold += avgSeriesPointNumThreshold;
       }
@@ -144,10 +146,11 @@ public abstract class AbstractMemTable implements 
IMemTable {
             k -> {
               seriesNumber += schemaList.size();
               totalPointsNumThreshold += ((long) avgSeriesPointNumThreshold) * 
schemaList.size();
-              return new AlignedWritableMemChunkGroup(schemaList);
+              return new AlignedWritableMemChunkGroup(
+                  
schemaList.stream().filter(Objects::nonNull).collect(Collectors.toList()));
             });
     for (IMeasurementSchema schema : schemaList) {
-      if (!memChunkGroup.contains(schema.getMeasurementId())) {
+      if (schema != null && 
!memChunkGroup.contains(schema.getMeasurementId())) {
         seriesNumber++;
         totalPointsNumThreshold += avgSeriesPointNumThreshold;
       }
@@ -275,6 +278,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
     for (int i = 0; i < insertRowPlan.getMeasurements().length; i++) {
       // use measurements[i] to ignore failed partial insert
       if (measurements[i] == null) {
+        schemaList.add(null);
         continue;
       }
       IMeasurementSchema schema = 
insertRowPlan.getMeasurementMNodes()[i].getSchema();
@@ -318,6 +322,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
     for (int i = 0; i < insertRowNode.getMeasurements().length; i++) {
       // use measurements[i] to ignore failed partial insert
       if (measurements[i] == null) {
+        schemaList.add(null);
         continue;
       }
       IMeasurementSchema schema = insertRowNode.getMeasurementSchemas()[i];
@@ -481,10 +486,10 @@ public abstract class AbstractMemTable implements 
IMemTable {
     List<IMeasurementSchema> schemaList = new ArrayList<>();
     for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) {
       if (insertTabletPlan.getColumns()[i] == null) {
-        continue;
+        schemaList.add(null);
+      } else {
+        schemaList.add(insertTabletPlan.getMeasurementMNodes()[i].getSchema());
       }
-      IMeasurementSchema schema = 
insertTabletPlan.getMeasurementMNodes()[i].getSchema();
-      schemaList.add(schema);
     }
     IWritableMemChunkGroup memChunkGroup =
         createMemChunkGroupIfNotExistAndGet(insertTabletPlan.getDeviceID(), 
schemaList);
@@ -507,10 +512,10 @@ public abstract class AbstractMemTable implements 
IMemTable {
     List<IMeasurementSchema> schemaList = new ArrayList<>();
     for (int i = 0; i < insertTabletNode.getMeasurementSchemas().length; i++) {
       if (insertTabletNode.getColumns()[i] == null) {
-        continue;
+        schemaList.add(null);
+      } else {
+        schemaList.add(insertTabletNode.getMeasurementSchemas()[i]);
       }
-      IMeasurementSchema schema = insertTabletNode.getMeasurementSchemas()[i];
-      schemaList.add(schema);
     }
     IWritableMemChunkGroup memChunkGroup =
         createMemChunkGroupIfNotExistAndGet(insertTabletNode.getDeviceID(), 
schemaList);
@@ -533,10 +538,10 @@ public abstract class AbstractMemTable implements 
IMemTable {
     List<IMeasurementSchema> schemaList = new ArrayList<>();
     for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) {
       if (insertTabletPlan.getColumns()[i] == null) {
-        continue;
+        schemaList.add(null);
+      } else {
+        schemaList.add(insertTabletPlan.getMeasurementMNodes()[i].getSchema());
       }
-      IMeasurementSchema schema = 
insertTabletPlan.getMeasurementMNodes()[i].getSchema();
-      schemaList.add(schema);
     }
     if (schemaList.isEmpty()) {
       return;
@@ -562,10 +567,10 @@ public abstract class AbstractMemTable implements 
IMemTable {
     List<IMeasurementSchema> schemaList = new ArrayList<>();
     for (int i = 0; i < insertTabletNode.getMeasurementSchemas().length; i++) {
       if (insertTabletNode.getColumns()[i] == null) {
-        continue;
+        schemaList.add(null);
+      } else {
+        schemaList.add(insertTabletNode.getMeasurementSchemas()[i]);
       }
-      IMeasurementSchema schema = insertTabletNode.getMeasurementSchemas()[i];
-      schemaList.add(schema);
     }
     if (schemaList.isEmpty()) {
       return;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
index d40915792b..dbcbc5c207 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
@@ -182,22 +182,30 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
     putAlignedValues(times, valueList, bitMaps, columnIndexArray, start, end);
   }
 
+  /**
+   * Check schema of columns and return array that mapping existed schema to 
index of data column
+   *
+   * @param schemaListInInsertPlan Contains all existed schema in InsertPlan. 
If some timeseries
+   *     have been deleted, there will be null in its slot.
+   * @return columnIndexArray: schemaList[i] is schema of 
columns[columnIndexArray[i]]
+   */
   private int[] checkColumnsInInsertPlan(List<IMeasurementSchema> 
schemaListInInsertPlan) {
     Map<String, Integer> measurementIdsInInsertPlan = new HashMap<>();
     for (int i = 0; i < schemaListInInsertPlan.size(); i++) {
-      
measurementIdsInInsertPlan.put(schemaListInInsertPlan.get(i).getMeasurementId(),
 i);
-      if 
(!containsMeasurement(schemaListInInsertPlan.get(i).getMeasurementId())) {
-        this.measurementIndexMap.put(
-            schemaListInInsertPlan.get(i).getMeasurementId(), 
measurementIndexMap.size());
-        this.schemaList.add(schemaListInInsertPlan.get(i));
-        this.list.extendColumn(schemaListInInsertPlan.get(i).getType());
+      if (schemaListInInsertPlan.get(i) != null) {
+        
measurementIdsInInsertPlan.put(schemaListInInsertPlan.get(i).getMeasurementId(),
 i);
+        if 
(!containsMeasurement(schemaListInInsertPlan.get(i).getMeasurementId())) {
+          this.measurementIndexMap.put(
+              schemaListInInsertPlan.get(i).getMeasurementId(), 
measurementIndexMap.size());
+          this.schemaList.add(schemaListInInsertPlan.get(i));
+          this.list.extendColumn(schemaListInInsertPlan.get(i).getType());
+        }
       }
     }
     int[] columnIndexArray = new int[measurementIndexMap.size()];
     measurementIndexMap.forEach(
-        (measurementId, i) -> {
-          columnIndexArray[i] = 
measurementIdsInInsertPlan.getOrDefault(measurementId, -1);
-        });
+        (measurementId, i) ->
+            columnIndexArray[i] = 
measurementIdsInInsertPlan.getOrDefault(measurementId, -1));
     return columnIndexArray;
   }
 
diff --git 
a/server/src/test/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoerTest.java
index 1be4ef012a..feab7d9d82 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoerTest.java
@@ -19,10 +19,12 @@
 package org.apache.iotdb.db.wal.recover.file;
 
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
@@ -32,18 +34,22 @@ import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.wal.utils.TsFileUtilsForRecoverTest;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.write.TsFileWriter;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.BooleanDataPoint;
 import org.apache.iotdb.tsfile.write.record.datapoint.DoubleDataPoint;
 import org.apache.iotdb.tsfile.write.record.datapoint.FloatDataPoint;
 import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint;
 import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.StringDataPoint;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.junit.After;
@@ -53,6 +59,7 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -64,13 +71,25 @@ public class TsFilePlanRedoerTest {
   private static final String SG_NAME = "root.recover_sg";
   private static final String DEVICE1_NAME = SG_NAME.concat(".d1");
   private static final String DEVICE2_NAME = SG_NAME.concat(".d2");
+  private static final String DEVICE3_NAME = SG_NAME.concat(".d3");
   private static final String FILE_NAME =
       TsFileUtilsForRecoverTest.getTestTsFilePath(SG_NAME, 0, 0, 1);
   private TsFileResource tsFileResource;
+  private CompressionType compressionType;
+  boolean prevIsAutoCreateSchemaEnabled;
+  boolean prevIsEnablePartialInsert;
 
   @Before
   public void setUp() throws Exception {
     EnvironmentUtils.envSetUp();
+
+    // set recover config, avoid creating deleted time series when recovering 
wal
+    prevIsAutoCreateSchemaEnabled =
+        IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
+    
IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(false);
+    prevIsEnablePartialInsert = 
IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert();
+    IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(true);
+    compressionType = 
TSFileDescriptor.getInstance().getConfig().getCompressor();
     IoTDB.schemaProcessor.setStorageGroup(new PartialPath(SG_NAME));
     IoTDB.schemaProcessor.createTimeseries(
         new PartialPath(DEVICE1_NAME.concat(".s1")),
@@ -96,6 +115,19 @@ public class TsFilePlanRedoerTest {
         TSEncoding.RLE,
         TSFileDescriptor.getInstance().getConfig().getCompressor(),
         Collections.emptyMap());
+    IoTDB.schemaProcessor.createAlignedTimeSeries(
+        new PartialPath(DEVICE3_NAME),
+        Arrays.asList("s1", "s2", "s3", "s4", "s5"),
+        Arrays.asList(
+            TSDataType.INT32,
+            TSDataType.INT64,
+            TSDataType.BOOLEAN,
+            TSDataType.FLOAT,
+            TSDataType.TEXT),
+        Arrays.asList(
+            TSEncoding.RLE, TSEncoding.RLE, TSEncoding.RLE, TSEncoding.RLE, 
TSEncoding.PLAIN),
+        Arrays.asList(
+            compressionType, compressionType, compressionType, 
compressionType, compressionType));
   }
 
   @After
@@ -104,6 +136,11 @@ public class TsFilePlanRedoerTest {
       tsFileResource.close();
     }
     EnvironmentUtils.cleanEnv();
+    // reset config
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setAutoCreateSchemaEnabled(prevIsAutoCreateSchemaEnabled);
+    
IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(prevIsEnablePartialInsert);
   }
 
   @Test
@@ -161,6 +198,64 @@ public class TsFilePlanRedoerTest {
     assertEquals(6, time);
   }
 
+  @Test
+  public void testRedoInsertAlignedRowPlan() throws Exception {
+    // generate .tsfile and update resource in memory
+    File file = new File(FILE_NAME);
+    generateCompleteFile(file);
+    tsFileResource = new TsFileResource(file);
+    tsFileResource.updateStartTime(DEVICE3_NAME, 5);
+    tsFileResource.updateStartTime(DEVICE3_NAME, 5);
+
+    // generate InsertRowPlan
+    long time = 6;
+    TSDataType[] dataTypes =
+        new TSDataType[] {
+          TSDataType.INT32, TSDataType.INT64, TSDataType.BOOLEAN, 
TSDataType.FLOAT, TSDataType.TEXT
+        };
+    String[] columns = new String[] {1 + "", 1 + "", true + "", 1.0 + "", "1"};
+    InsertRowPlan insertRowPlan =
+        new InsertRowPlan(
+            new PartialPath(DEVICE3_NAME),
+            time,
+            new String[] {"s1", "s2", "s3", "s4", "s5"},
+            dataTypes,
+            columns,
+            true);
+
+    // redo InsertTabletPlan, vsg processor is used to test IdTable, don't 
test IdTable here
+    TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true, 
null);
+    planRedoer.redoInsert(insertRowPlan);
+
+    // check data in memTable
+    IMemTable recoveryMemTable = planRedoer.getRecoveryMemTable();
+    // check d3
+    AlignedPath fullPath =
+        new AlignedPath(
+            DEVICE3_NAME,
+            Arrays.asList("s1", "s2", "s3", "s4", "s5"),
+            Arrays.asList(
+                new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE),
+                new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE),
+                new MeasurementSchema("s3", TSDataType.BOOLEAN, 
TSEncoding.RLE),
+                new MeasurementSchema("s4", TSDataType.FLOAT, TSEncoding.RLE),
+                new MeasurementSchema("s5", TSDataType.TEXT, 
TSEncoding.PLAIN)));
+    ReadOnlyMemChunk memChunk = recoveryMemTable.query(fullPath, 
Long.MIN_VALUE, null);
+    IPointReader iterator = memChunk.getPointReader();
+    time = 6;
+    while (iterator.hasNextTimeValuePair()) {
+      TimeValuePair timeValuePair = iterator.nextTimeValuePair();
+      assertEquals(time, timeValuePair.getTimestamp());
+      assertEquals(1, timeValuePair.getValue().getVector()[0].getInt());
+      assertEquals(1L, timeValuePair.getValue().getVector()[1].getLong());
+      assertEquals(true, timeValuePair.getValue().getVector()[2].getBoolean());
+      assertEquals(1, timeValuePair.getValue().getVector()[3].getFloat(), 
0.00001);
+      assertEquals(Binary.valueOf("1"), 
timeValuePair.getValue().getVector()[4].getBinary());
+      ++time;
+    }
+    assertEquals(7, time);
+  }
+
   @Test
   public void testRedoInsertTabletPlan() throws Exception {
     // generate .tsfile and update resource in memory
@@ -239,6 +334,94 @@ public class TsFilePlanRedoerTest {
     assertEquals(8, time);
   }
 
+  @Test
+  public void testRedoInsertAlignedTabletPlan() throws Exception {
+    // generate .tsfile and update resource in memory
+    File file = new File(FILE_NAME);
+    generateCompleteFile(file);
+    tsFileResource = new TsFileResource(file);
+    tsFileResource.updateStartTime(DEVICE3_NAME, 5);
+    tsFileResource.updateStartTime(DEVICE3_NAME, 5);
+
+    // generate InsertTabletPlan
+    long[] times = {6, 7, 8, 9};
+    List<Integer> dataTypes = new ArrayList<>();
+    dataTypes.add(TSDataType.INT32.ordinal());
+    dataTypes.add(TSDataType.INT64.ordinal());
+    dataTypes.add(TSDataType.BOOLEAN.ordinal());
+    dataTypes.add(TSDataType.FLOAT.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+
+    Object[] columns = new Object[5];
+    columns[0] = new int[times.length];
+    columns[1] = new long[times.length];
+    columns[2] = new boolean[times.length];
+    columns[3] = new float[times.length];
+    columns[4] = new Binary[times.length];
+
+    for (int r = 0; r < times.length; r++) {
+      ((int[]) columns[0])[r] = (r + 1) * 100;
+      ((long[]) columns[1])[r] = (r + 1) * 100;
+      ((boolean[]) columns[2])[r] = true;
+      ((float[]) columns[3])[r] = (r + 1) * 100;
+      ((Binary[]) columns[4])[r] = Binary.valueOf((r + 1) * 100 + "");
+    }
+
+    BitMap[] bitMaps = new BitMap[dataTypes.size()];
+    for (int i = 0; i < dataTypes.size(); i++) {
+      if (bitMaps[i] == null) {
+        bitMaps[i] = new BitMap(times.length);
+      }
+      // mark value of time=9 as null
+      bitMaps[i].mark(3);
+    }
+
+    InsertTabletPlan insertTabletPlan =
+        new InsertTabletPlan(
+            new PartialPath(DEVICE3_NAME),
+            new String[] {"s1", "s2", "s3", "s4", "s5"},
+            dataTypes,
+            true);
+    insertTabletPlan.setTimes(times);
+    insertTabletPlan.setColumns(columns);
+    insertTabletPlan.setRowCount(times.length);
+    insertTabletPlan.setBitMaps(bitMaps);
+
+    // redo InsertTabletPlan, vsg processor is used to test IdTable, don't 
test IdTable here
+    TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true, 
null);
+    planRedoer.redoInsert(insertTabletPlan);
+
+    // check data in memTable
+    IMemTable recoveryMemTable = planRedoer.getRecoveryMemTable();
+    // check d3
+    AlignedPath fullPath =
+        new AlignedPath(
+            DEVICE3_NAME,
+            Arrays.asList("s1", "s2", "s3", "s4", "s5"),
+            Arrays.asList(
+                new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE),
+                new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE),
+                new MeasurementSchema("s3", TSDataType.BOOLEAN, 
TSEncoding.RLE),
+                new MeasurementSchema("s4", TSDataType.FLOAT, TSEncoding.RLE),
+                new MeasurementSchema("s5", TSDataType.TEXT, 
TSEncoding.PLAIN)));
+    ReadOnlyMemChunk memChunk = recoveryMemTable.query(fullPath, 
Long.MIN_VALUE, null);
+    IPointReader iterator = memChunk.getPointReader();
+    int time = 6;
+    while (iterator.hasNextTimeValuePair()) {
+      TimeValuePair timeValuePair = iterator.nextTimeValuePair();
+      assertEquals(time, timeValuePair.getTimestamp());
+      assertEquals((time - 5) * 100, 
timeValuePair.getValue().getVector()[0].getInt());
+      assertEquals((time - 5) * 100L, 
timeValuePair.getValue().getVector()[1].getLong());
+      assertEquals(true, timeValuePair.getValue().getVector()[2].getBoolean());
+      assertEquals((time - 5) * 100, 
timeValuePair.getValue().getVector()[3].getFloat(), 0.00001);
+      assertEquals(
+          Binary.valueOf((time - 5) * 100 + ""),
+          timeValuePair.getValue().getVector()[4].getBinary());
+      ++time;
+    }
+    assertEquals(9, time);
+  }
+
   @Test
   public void testRedoOverLapPlanIntoSeqFile() throws Exception {
     // generate .tsfile and update resource in memory
@@ -371,6 +554,111 @@ public class TsFilePlanRedoerTest {
     assertTrue(modsFile.exists());
   }
 
+  @Test
+  public void testRedoAlignedInsertAfterDeleteTimeseries() throws Exception {
+    // some timeseries have been deleted
+    IoTDB.schemaProcessor.deleteTimeseries(new 
PartialPath(DEVICE3_NAME.concat(".s1")));
+    IoTDB.schemaProcessor.deleteTimeseries(new 
PartialPath(DEVICE3_NAME.concat(".s5")));
+    // generate .tsfile and update resource in memory
+    File file = new File(FILE_NAME);
+    generateCompleteFile(file);
+    tsFileResource = new TsFileResource(file);
+    tsFileResource.updateStartTime(DEVICE3_NAME, 5);
+    tsFileResource.updateStartTime(DEVICE3_NAME, 5);
+
+    // generate InsertTabletPlan
+    long[] times = {6, 7, 8, 9};
+    List<Integer> dataTypes =
+        Arrays.asList(
+            TSDataType.INT32.ordinal(),
+            TSDataType.INT64.ordinal(),
+            TSDataType.BOOLEAN.ordinal(),
+            TSDataType.FLOAT.ordinal(),
+            TSDataType.TEXT.ordinal());
+    Object[] columns =
+        new Object[] {
+          new int[times.length],
+          new long[times.length],
+          new boolean[times.length],
+          new float[times.length],
+          new Binary[times.length]
+        };
+    for (int r = 0; r < times.length; r++) {
+      ((int[]) columns[0])[r] = (r + 1) * 100;
+      ((long[]) columns[1])[r] = (r + 1) * 100;
+      ((boolean[]) columns[2])[r] = true;
+      ((float[]) columns[3])[r] = (r + 1) * 100;
+      ((Binary[]) columns[4])[r] = Binary.valueOf((r + 1) * 100 + "");
+    }
+    BitMap[] bitMaps = new BitMap[dataTypes.size()];
+    for (int i = 0; i < dataTypes.size(); i++) {
+      if (bitMaps[i] == null) {
+        bitMaps[i] = new BitMap(times.length);
+      }
+      // mark value of time=9 as null
+      bitMaps[i].mark(3);
+    }
+    InsertTabletPlan insertTabletPlan =
+        new InsertTabletPlan(
+            new PartialPath(DEVICE3_NAME),
+            new String[] {"s1", "s2", "s3", "s4", "s5"},
+            dataTypes,
+            true);
+    insertTabletPlan.setTimes(times);
+    insertTabletPlan.setColumns(columns);
+    insertTabletPlan.setRowCount(times.length);
+    insertTabletPlan.setBitMaps(bitMaps);
+    // redo InsertTabletPlan, vsg processor is used to test IdTable, don't 
test IdTable here
+    TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true, 
null);
+    planRedoer.redoInsert(insertTabletPlan);
+
+    // generate InsertRowPlan
+    int time = 9;
+    TSDataType[] dataTypes2 =
+        new TSDataType[] {
+          TSDataType.INT32, TSDataType.INT64, TSDataType.BOOLEAN, 
TSDataType.FLOAT, TSDataType.TEXT
+        };
+    String[] columns2 = new String[] {400 + "", 400 + "", true + "", 400.0 + 
"", "400"};
+    InsertRowPlan insertRowPlan =
+        new InsertRowPlan(
+            new PartialPath(DEVICE3_NAME),
+            time,
+            new String[] {"s1", "s2", "s3", "s4", "s5"},
+            dataTypes2,
+            columns2,
+            true);
+    // redo InsertTabletPlan, vsg processor is used to test IdTable, don't 
test IdTable here
+    planRedoer.redoInsert(insertRowPlan);
+
+    // check data in memTable
+    IMemTable recoveryMemTable = planRedoer.getRecoveryMemTable();
+    // check d3
+    AlignedPath fullPath =
+        new AlignedPath(
+            DEVICE3_NAME,
+            Arrays.asList("s1", "s2", "s3", "s4", "s5"),
+            Arrays.asList(
+                new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE),
+                new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE),
+                new MeasurementSchema("s3", TSDataType.BOOLEAN, 
TSEncoding.RLE),
+                new MeasurementSchema("s4", TSDataType.FLOAT, TSEncoding.RLE),
+                new MeasurementSchema("s5", TSDataType.TEXT, 
TSEncoding.PLAIN)));
+    ReadOnlyMemChunk memChunk = recoveryMemTable.query(fullPath, 
Long.MIN_VALUE, null);
+    IPointReader iterator = memChunk.getPointReader();
+    time = 6;
+    while (iterator.hasNextTimeValuePair()) {
+      TimeValuePair timeValuePair = iterator.nextTimeValuePair();
+      assertEquals(time, timeValuePair.getTimestamp());
+      assertEquals(null, timeValuePair.getValue().getVector()[0]);
+      assertEquals((time - 5) * 100L, 
timeValuePair.getValue().getVector()[1].getLong());
+      assertEquals(true, timeValuePair.getValue().getVector()[2].getBoolean());
+      assertEquals((time - 5) * 100, 
timeValuePair.getValue().getVector()[3].getFloat(), 0.00001);
+      assertEquals(null, timeValuePair.getValue().getVector()[4]);
+      time++;
+    }
+    assertEquals(10, time);
+  }
+
   private void generateCompleteFile(File tsFile) throws IOException, 
WriteProcessException {
     try (TsFileWriter writer = new TsFileWriter(tsFile)) {
       writer.registerTimeseries(
@@ -381,6 +669,14 @@ public class TsFilePlanRedoerTest {
           new Path(DEVICE2_NAME), new MeasurementSchema("s1", 
TSDataType.FLOAT, TSEncoding.RLE));
       writer.registerTimeseries(
           new Path(DEVICE2_NAME), new MeasurementSchema("s2", 
TSDataType.DOUBLE, TSEncoding.RLE));
+      writer.registerAlignedTimeseries(
+          new Path(DEVICE3_NAME),
+          Arrays.asList(
+              new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE),
+              new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE),
+              new MeasurementSchema("s3", TSDataType.BOOLEAN, TSEncoding.RLE),
+              new MeasurementSchema("s4", TSDataType.FLOAT, TSEncoding.RLE),
+              new MeasurementSchema("s5", TSDataType.TEXT, TSEncoding.PLAIN)));
       writer.write(
           new TSRecord(1, DEVICE1_NAME)
               .addTuple(new IntDataPoint("s1", 1))
@@ -397,6 +693,13 @@ public class TsFilePlanRedoerTest {
           new TSRecord(4, DEVICE2_NAME)
               .addTuple(new FloatDataPoint("s1", 4))
               .addTuple(new DoubleDataPoint("s2", 4)));
+      writer.writeAligned(
+          new TSRecord(5, DEVICE3_NAME)
+              .addTuple(new IntDataPoint("s1", 5))
+              .addTuple(new LongDataPoint("s2", 5))
+              .addTuple(new BooleanDataPoint("s3", true))
+              .addTuple(new FloatDataPoint("s4", 5))
+              .addTuple(new StringDataPoint("s5", Binary.valueOf("5"))));
     }
   }
 }

Reply via email to