This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 25b70f8b884 [IOTDB-5990][IOTDB-5991] Pipe: support transferring 
aligned tablets (#10137)
25b70f8b884 is described below

commit 25b70f8b884294ccccdd7710d5e39f382102ab4f
Author: 马子坤 <[email protected]>
AuthorDate: Sun Jun 18 06:45:53 2023 +0800

    [IOTDB-5990][IOTDB-5991] Pipe: support transferring aligned tablets (#10137)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../pipe/connector/legacy/IoTDBSyncConnector.java  | 15 ++++-
 .../pipe/connector/v1/IoTDBThriftConnectorV1.java  |  3 +-
 .../v1/request/PipeTransferTabletReq.java          | 48 +++++++++-----
 .../pipe/connector/v2/IoTDBThriftConnectorV2.java  |  7 +-
 .../iotdb/db/pipe/event/common/row/PipeRow.java    |  7 ++
 .../db/pipe/event/common/row/PipeRowCollector.java |  4 +-
 .../tablet/PipeInsertNodeTabletInsertionEvent.java | 19 ++++--
 .../common/tablet/PipeRawTabletInsertionEvent.java | 20 ++++--
 .../tablet/TabletInsertionDataContainer.java       | 17 +++--
 .../tsfile/TsFileInsertionDataContainer.java       | 21 +++++-
 .../realtime/PipeRealtimeCollectEventFactory.java  |  3 +-
 .../pipe/event/PipeTabletInsertionEventTest.java   | 76 ++++++++++++++++++++--
 .../event/TsFileInsertionDataContainerTest.java    | 16 ++---
 13 files changed, 203 insertions(+), 53 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java
index b4f48208c72..ecb03c87a7f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java
@@ -46,6 +46,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
 import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
 import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.write.record.Tablet;
 
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.thrift.TException;
@@ -183,12 +184,22 @@ public class IoTDBSyncConnector implements PipeConnector {
 
   private void doTransfer(PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeInsertionEvent)
       throws IoTDBConnectionException, StatementExecutionException {
-    sessionPool.insertTablet(pipeInsertNodeInsertionEvent.convertToTablet());
+    final Tablet tablet = pipeInsertNodeInsertionEvent.convertToTablet();
+    if (pipeInsertNodeInsertionEvent.isAligned()) {
+      sessionPool.insertAlignedTablet(tablet);
+    } else {
+      sessionPool.insertTablet(tablet);
+    }
   }
 
   private void doTransfer(PipeRawTabletInsertionEvent pipeTabletInsertionEvent)
       throws PipeException, TException, IoTDBConnectionException, 
StatementExecutionException {
-    sessionPool.insertTablet(pipeTabletInsertionEvent.convertToTablet());
+    final Tablet tablet = pipeTabletInsertionEvent.convertToTablet();
+    if (pipeTabletInsertionEvent.isAligned()) {
+      sessionPool.insertAlignedTablet(tablet);
+    } else {
+      sessionPool.insertTablet(tablet);
+    }
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
index 84ef31885f2..bb18acda62d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
@@ -163,7 +163,8 @@ public class IoTDBThriftConnectorV1 implements 
PipeConnector {
     final TPipeTransferResp resp =
         client.pipeTransfer(
             PipeTransferTabletReq.toTPipeTransferReq(
-                pipeRawTabletInsertionEvent.convertToTablet()));
+                pipeRawTabletInsertionEvent.convertToTablet(),
+                pipeRawTabletInsertionEvent.isAligned()));
 
     if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       throw new PipeException(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java
index ead686a6acf..f69fbee9695 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java
@@ -32,6 +32,8 @@ import 
org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -39,22 +41,35 @@ import 
org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Comparator;
 
 public class PipeTransferTabletReq extends TPipeTransferReq {
+
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTransferTabletReq.class);
+
   private Tablet tablet;
+  private boolean isAligned;
 
-  public static PipeTransferTabletReq toTPipeTransferReq(Tablet tablet) throws 
IOException {
+  public static PipeTransferTabletReq toTPipeTransferReq(Tablet tablet, 
boolean isAligned)
+      throws IOException {
     final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq();
 
     tabletReq.tablet = tablet;
 
     tabletReq.version = 
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion();
     tabletReq.type = PipeRequestType.TRANSFER_TABLET.getType();
-    tabletReq.body = tablet.serialize();
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      tablet.serialize(outputStream);
+      ReadWriteIOUtils.write(isAligned, outputStream);
+      tabletReq.body =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+
     return tabletReq;
   }
 
@@ -67,7 +82,7 @@ public class PipeTransferTabletReq extends TPipeTransferReq {
     return true;
   }
 
-  public static void sortTablet(Tablet tablet) {
+  private static void sortTablet(Tablet tablet) {
     /*
      * following part of code sort the batch data by time,
      * so we can insert continuous data in value list to get a better 
performance
@@ -180,6 +195,19 @@ public class PipeTransferTabletReq extends 
TPipeTransferReq {
     return sortedBitMap;
   }
 
+  public static PipeTransferTabletReq fromTPipeTransferReq(TPipeTransferReq 
transferReq) {
+    final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq();
+
+    tabletReq.tablet = Tablet.deserialize(transferReq.body);
+    tabletReq.isAligned = ReadWriteIOUtils.readBool(transferReq.body);
+
+    tabletReq.version = transferReq.version;
+    tabletReq.type = transferReq.type;
+    tabletReq.body = transferReq.body;
+
+    return tabletReq;
+  }
+
   public InsertTabletStatement constructStatement() {
     if (!checkSorted(tablet)) {
       sortTablet(tablet);
@@ -194,7 +222,7 @@ public class PipeTransferTabletReq extends TPipeTransferReq 
{
       }
 
       request.setPrefixPath(tablet.deviceId);
-      request.setIsAligned(false);
+      request.setIsAligned(isAligned);
       request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
       request.setValues(SessionUtils.getValueBuffer(tablet));
       request.setSize(tablet.rowSize);
@@ -207,16 +235,4 @@ public class PipeTransferTabletReq extends 
TPipeTransferReq {
       return null;
     }
   }
-
-  public static PipeTransferTabletReq fromTPipeTransferReq(TPipeTransferReq 
transferReq) {
-    final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq();
-
-    tabletReq.tablet = Tablet.deserialize(transferReq.body);
-
-    tabletReq.version = transferReq.version;
-    tabletReq.type = transferReq.type;
-    tabletReq.body = transferReq.body;
-
-    return tabletReq;
-  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
index 3beb764526d..2773e24b9c2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
@@ -26,8 +26,6 @@ import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.connector.v1.IoTDBThriftConnectorClient;
 import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferHandshakeReq;
 import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferInsertNodeReq;
@@ -75,7 +73,6 @@ public class IoTDBThriftConnectorV2 implements PipeConnector {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBThriftConnectorV2.class);
 
   private static final CommonConfig COMMON_CONFIG = 
CommonDescriptor.getInstance().getConfig();
-  private static final IoTDBConfig IOTDB_CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
 
   private static final IClientManager<TEndPoint, 
AsyncPipeDataTransferServiceClient>
       ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER =
@@ -183,7 +180,9 @@ public class IoTDBThriftConnectorV2 implements 
PipeConnector {
       final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
           (PipeRawTabletInsertionEvent) tabletInsertionEvent;
       final PipeTransferTabletReq pipeTransferTabletReq =
-          
PipeTransferTabletReq.toTPipeTransferReq(pipeRawTabletInsertionEvent.convertToTablet());
+          PipeTransferTabletReq.toTPipeTransferReq(
+              pipeRawTabletInsertionEvent.convertToTablet(),
+              pipeRawTabletInsertionEvent.isAligned());
       final PipeTransferRawTabletInsertionEventHandler 
pipeTransferTabletReqHandler =
           new PipeTransferRawTabletInsertionEventHandler(
               requestCommitId, pipeTransferTabletReq, this);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
index 24ad5e108fc..f259b84a2e7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
@@ -35,6 +35,7 @@ public class PipeRow implements Row {
   private final int rowIndex;
 
   private final String deviceId;
+  private final boolean isAligned;
   private final MeasurementSchema[] measurementSchemaList;
 
   private final long[] timestampColumn;
@@ -47,6 +48,7 @@ public class PipeRow implements Row {
   public PipeRow(
       int rowIndex,
       String deviceId,
+      boolean isAligned,
       MeasurementSchema[] measurementSchemaList,
       long[] timestampColumn,
       TSDataType[] valueColumnTypes,
@@ -55,6 +57,7 @@ public class PipeRow implements Row {
       String[] columnNameStringList) {
     this.rowIndex = rowIndex;
     this.deviceId = deviceId;
+    this.isAligned = isAligned;
     this.measurementSchemaList = measurementSchemaList;
     this.timestampColumn = timestampColumn;
     this.valueColumnTypes = valueColumnTypes;
@@ -165,6 +168,10 @@ public class PipeRow implements Row {
     return deviceId;
   }
 
+  public boolean isAligned() {
+    return isAligned;
+  }
+
   public MeasurementSchema[] getMeasurementSchemaList() {
     return measurementSchemaList;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
index ea989d48027..ac0cbc6e96f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
@@ -35,6 +35,7 @@ public class PipeRowCollector implements RowCollector {
 
   private final List<TabletInsertionEvent> tabletInsertionEventList = new 
ArrayList<>();
   private Tablet tablet = null;
+  private boolean isAligned = false;
 
   @Override
   public void collectRow(Row row) {
@@ -50,6 +51,7 @@ public class PipeRowCollector implements RowCollector {
       final List<MeasurementSchema> measurementSchemaList =
           new ArrayList<>(Arrays.asList(measurementSchemaArray));
       tablet = new Tablet(deviceId, measurementSchemaList);
+      isAligned = pipeRow.isAligned();
       tablet.initBitMaps();
     }
 
@@ -78,7 +80,7 @@ public class PipeRowCollector implements RowCollector {
 
   private void collectTabletInsertionEvent() {
     if (tablet != null) {
-      tabletInsertionEventList.add(new PipeRawTabletInsertionEvent(tablet));
+      tabletInsertionEventList.add(new PipeRawTabletInsertionEvent(tablet, 
isAligned));
     }
     this.tablet = null;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 5623d7715a6..0933e1a3c71 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -45,22 +45,25 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
 
   private final WALEntryHandler walEntryHandler;
   private final ProgressIndex progressIndex;
+  private final boolean isAligned;
 
   private TabletInsertionDataContainer dataContainer;
 
   public PipeInsertNodeTabletInsertionEvent(
-      WALEntryHandler walEntryHandler, ProgressIndex progressIndex) {
-    this(walEntryHandler, progressIndex, null, null);
+      WALEntryHandler walEntryHandler, ProgressIndex progressIndex, boolean 
isAligned) {
+    this(walEntryHandler, progressIndex, isAligned, null, null);
   }
 
   private PipeInsertNodeTabletInsertionEvent(
       WALEntryHandler walEntryHandler,
       ProgressIndex progressIndex,
+      boolean isAligned,
       PipeTaskMeta pipeTaskMeta,
       String pattern) {
     super(pipeTaskMeta, pattern);
     this.walEntryHandler = walEntryHandler;
     this.progressIndex = progressIndex;
+    this.isAligned = isAligned;
   }
 
   public InsertNode getInsertNode() throws WALPipeException {
@@ -108,7 +111,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   public PipeInsertNodeTabletInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
       PipeTaskMeta pipeTaskMeta, String pattern) {
     return new PipeInsertNodeTabletInsertionEvent(
-        walEntryHandler, progressIndex, pipeTaskMeta, pattern);
+        walEntryHandler, progressIndex, isAligned, pipeTaskMeta, pattern);
   }
 
   /////////////////////////// TabletInsertionEvent ///////////////////////////
@@ -139,6 +142,12 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
     }
   }
 
+  /////////////////////////// convertToTablet ///////////////////////////
+
+  public boolean isAligned() {
+    return isAligned;
+  }
+
   public Tablet convertToTablet() {
     try {
       if (dataContainer == null) {
@@ -155,11 +164,13 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
 
   @Override
   public String toString() {
-    return "PipeRawTabletInsertionEvent{"
+    return "PipeInsertNodeTabletInsertionEvent{"
         + "walEntryHandler="
         + walEntryHandler
         + ", progressIndex="
         + progressIndex
+        + ", isAligned="
+        + isAligned
         + '}';
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index c8bc2fd55ab..594c0a38291 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -31,16 +31,18 @@ import java.util.function.BiConsumer;
 public class PipeRawTabletInsertionEvent implements TabletInsertionEvent {
 
   private final Tablet tablet;
+  private final boolean isAligned;
   private final String pattern;
 
   private TabletInsertionDataContainer dataContainer;
 
-  public PipeRawTabletInsertionEvent(Tablet tablet) {
-    this(Objects.requireNonNull(tablet), null);
+  public PipeRawTabletInsertionEvent(Tablet tablet, boolean isAligned) {
+    this(tablet, isAligned, null);
   }
 
-  public PipeRawTabletInsertionEvent(Tablet tablet, String pattern) {
+  public PipeRawTabletInsertionEvent(Tablet tablet, boolean isAligned, String 
pattern) {
     this.tablet = Objects.requireNonNull(tablet);
+    this.isAligned = isAligned;
     this.pattern = pattern;
   }
 
@@ -53,7 +55,7 @@ public class PipeRawTabletInsertionEvent implements 
TabletInsertionEvent {
   @Override
   public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, 
RowCollector> consumer) {
     if (dataContainer == null) {
-      dataContainer = new TabletInsertionDataContainer(tablet, getPattern());
+      dataContainer = new TabletInsertionDataContainer(tablet, isAligned, 
getPattern());
     }
     return dataContainer.processRowByRow(consumer);
   }
@@ -61,11 +63,17 @@ public class PipeRawTabletInsertionEvent implements 
TabletInsertionEvent {
   @Override
   public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, 
RowCollector> consumer) {
     if (dataContainer == null) {
-      dataContainer = new TabletInsertionDataContainer(tablet, getPattern());
+      dataContainer = new TabletInsertionDataContainer(tablet, isAligned, 
getPattern());
     }
     return dataContainer.processTablet(consumer);
   }
 
+  /////////////////////////// convertToTablet ///////////////////////////
+
+  public boolean isAligned() {
+    return isAligned;
+  }
+
   public Tablet convertToTablet() {
     final String pattern = getPattern();
 
@@ -76,7 +84,7 @@ public class PipeRawTabletInsertionEvent implements 
TabletInsertionEvent {
 
     // if pattern is not "root", we need to convert the tablet
     if (dataContainer == null) {
-      dataContainer = new TabletInsertionDataContainer(tablet, pattern);
+      dataContainer = new TabletInsertionDataContainer(tablet, isAligned, 
pattern);
     }
     return dataContainer.convertToTablet();
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
index 49275556cd2..86fbbd28f6e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
@@ -48,6 +48,7 @@ import java.util.stream.IntStream;
 public class TabletInsertionDataContainer {
 
   private String deviceId;
+  private boolean isAligned;
   private MeasurementSchema[] measurementSchemaList;
   private String[] columnNameStringList;
 
@@ -76,8 +77,8 @@ public class TabletInsertionDataContainer {
     }
   }
 
-  public TabletInsertionDataContainer(Tablet tablet, String pattern) {
-    parse(tablet, pattern);
+  public TabletInsertionDataContainer(Tablet tablet, boolean isAligned, String 
pattern) {
+    parse(tablet, isAligned, pattern);
   }
 
   //////////////////////////// parse ////////////////////////////
@@ -87,6 +88,7 @@ public class TabletInsertionDataContainer {
     final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new 
Integer[originColumnSize];
 
     this.deviceId = insertRowNode.getDevicePath().getFullPath();
+    this.isAligned = insertRowNode.isAligned();
     this.timestampColumn = new long[] {insertRowNode.getTime()};
 
     generateColumnIndexMapper(
@@ -153,6 +155,7 @@ public class TabletInsertionDataContainer {
     final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new 
Integer[originColumnSize];
 
     this.deviceId = insertTabletNode.getDevicePath().getFullPath();
+    this.isAligned = insertTabletNode.isAligned();
     this.timestampColumn = insertTabletNode.getTimes();
 
     generateColumnIndexMapper(
@@ -204,11 +207,12 @@ public class TabletInsertionDataContainer {
     rowCount = timestampColumn.length;
   }
 
-  private void parse(Tablet tablet, String pattern) {
+  private void parse(Tablet tablet, boolean isAligned, String pattern) {
     final int originColumnSize = tablet.getSchemas().size();
     final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new 
Integer[originColumnSize];
 
     this.deviceId = tablet.deviceId;
+    this.isAligned = isAligned;
     this.timestampColumn = tablet.timestamps;
 
     final List<MeasurementSchema> originMeasurementSchemaList = 
tablet.getSchemas();
@@ -311,6 +315,7 @@ public class TabletInsertionDataContainer {
           new PipeRow(
               i,
               deviceId,
+              isAligned,
               measurementSchemaList,
               timestampColumn,
               valueColumnTypes,
@@ -328,7 +333,11 @@ public class TabletInsertionDataContainer {
     return rowCollector.convertToTabletInsertionEvents();
   }
 
-  ////////////////////////////  convert  ////////////////////////////
+  ////////////////////////////  convertToTablet  ////////////////////////////
+
+  public boolean isAligned() {
+    return isAligned;
+  }
 
   public Tablet convertToTablet() {
     if (tablet != null) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index e9347d31202..a9a5f657d83 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -24,12 +24,15 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TsFileDeviceIterator;
 import org.apache.iotdb.tsfile.read.TsFileReader;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.record.Tablet;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,6 +58,7 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
   private final TsFileReader tsFileReader;
 
   private final Iterator<Map.Entry<String, List<String>>> 
deviceMeasurementsMapIterator;
+  private final Map<String, Boolean> deviceIsAlignedMap;
   private final Map<String, TSDataType> measurementDataTypeMap;
 
   public TsFileInsertionDataContainer(File tsFile, String pattern, long 
startTime, long endTime)
@@ -72,6 +76,7 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
       tsFileReader = new TsFileReader(tsFileSequenceReader);
 
       deviceMeasurementsMapIterator = 
filterDeviceMeasurementsMapByPattern().entrySet().iterator();
+      deviceIsAlignedMap = readDeviceIsAlignedMap();
       measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap();
     } catch (Exception e) {
       LOGGER.error("failed to create TsFileInsertionDataContainer", e);
@@ -119,6 +124,17 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
     return filteredDeviceMeasurementsMap;
   }
 
+  private Map<String, Boolean> readDeviceIsAlignedMap() throws IOException {
+    final Map<String, Boolean> deviceIsAlignedMap = new HashMap<>();
+    final TsFileDeviceIterator deviceIsAlignedIterator =
+        tsFileSequenceReader.getAllDevicesIteratorWithIsAligned();
+    while (deviceIsAlignedIterator.hasNext()) {
+      final Pair<String, Boolean> deviceIsAlignedPair = 
deviceIsAlignedIterator.next();
+      deviceIsAlignedMap.put(deviceIsAlignedPair.getLeft(), 
deviceIsAlignedPair.getRight());
+    }
+    return deviceIsAlignedMap;
+  }
+
   /** @return TabletInsertionEvent in a streaming way */
   public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
     return () ->
@@ -159,8 +175,9 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
               throw new NoSuchElementException();
             }
 
-            final TabletInsertionEvent next =
-                new PipeRawTabletInsertionEvent(tabletIterator.next());
+            final Tablet tablet = tabletIterator.next();
+            final boolean isAligned = 
deviceIsAlignedMap.getOrDefault(tablet.deviceId, false);
+            final TabletInsertionEvent next = new 
PipeRawTabletInsertionEvent(tablet, isAligned);
 
             if (!hasNext()) {
               close();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEventFactory.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEventFactory.java
index 274553301f3..2b2be4abc21 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEventFactory.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEventFactory.java
@@ -38,7 +38,8 @@ public class PipeRealtimeCollectEventFactory {
   public static PipeRealtimeCollectEvent createCollectEvent(
       WALEntryHandler walEntryHandler, InsertNode insertNode, TsFileResource 
resource) {
     return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent(
-        new PipeInsertNodeTabletInsertionEvent(walEntryHandler, 
insertNode.getProgressIndex()),
+        new PipeInsertNodeTabletInsertionEvent(
+            walEntryHandler, insertNode.getProgressIndex(), 
insertNode.isAligned()),
         insertNode,
         resource);
   }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
index abd0cf9b056..00d44e8ade2 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
@@ -41,7 +41,9 @@ import java.util.Arrays;
 public class PipeTabletInsertionEventTest {
 
   private InsertRowNode insertRowNode;
+  private InsertRowNode insertRowNodeAligned;
   private InsertTabletNode insertTabletNode;
+  private InsertTabletNode insertTabletNodeAligned;
 
   final String deviceId = "root.sg.d1";
   final long[] times = new long[] {110L, 111L, 112L, 113L, 114L};
@@ -98,6 +100,18 @@ public class PipeTabletInsertionEventTest {
             times[0],
             values,
             false);
+
+    insertRowNodeAligned =
+        new InsertRowNode(
+            new PlanNodeId("plan node 1"),
+            new PartialPath(deviceId),
+            true,
+            measurementIds,
+            dataTypes,
+            schemas,
+            times[0],
+            values,
+            false);
   }
 
   private void createInsertTabletNode() throws IllegalPathException {
@@ -131,6 +145,19 @@ public class PipeTabletInsertionEventTest {
             null,
             values,
             times.length);
+
+    this.insertTabletNodeAligned =
+        new InsertTabletNode(
+            new PlanNodeId("plannode 1"),
+            new PartialPath(deviceId),
+            true,
+            measurementIds,
+            dataTypes,
+            schemas,
+            times,
+            null,
+            values,
+            times.length);
   }
 
   private void createTablet() {
@@ -194,18 +221,59 @@ public class PipeTabletInsertionEventTest {
 
   @Test
   public void convertToTabletForTest() {
-    Tablet tablet1 = new TabletInsertionDataContainer(insertRowNode, 
pattern).convertToTablet();
+    TabletInsertionDataContainer container1 =
+        new TabletInsertionDataContainer(insertRowNode, pattern);
+    Tablet tablet1 = container1.convertToTablet();
+    boolean isAligned1 = container1.isAligned();
+    Assert.assertEquals(tablet1, tabletForInsertRowNode);
+    Assert.assertFalse(isAligned1);
+
+    TabletInsertionDataContainer container2 =
+        new TabletInsertionDataContainer(insertTabletNode, pattern);
+    Tablet tablet2 = container2.convertToTablet();
+    boolean isAligned2 = container2.isAligned();
+    Assert.assertEquals(tablet2, tabletForInsertTabletNode);
+    Assert.assertFalse(isAligned2);
+
+    PipeRawTabletInsertionEvent event3 = new 
PipeRawTabletInsertionEvent(tablet1, false, pattern);
+    Tablet tablet3 = event3.convertToTablet();
+    boolean isAligned3 = event3.isAligned();
+    Assert.assertEquals(tablet1, tablet3);
+    Assert.assertFalse(isAligned3);
+
+    PipeRawTabletInsertionEvent event4 = new 
PipeRawTabletInsertionEvent(tablet2, false, pattern);
+    Tablet tablet4 = event4.convertToTablet();
+    boolean isAligned4 = event4.isAligned();
+    Assert.assertEquals(tablet2, tablet4);
+    Assert.assertFalse(isAligned4);
+  }
+
+  @Test
+  public void convertToAlignedTabletForTest() {
+    TabletInsertionDataContainer container1 =
+        new TabletInsertionDataContainer(insertRowNodeAligned, pattern);
+    Tablet tablet1 = container1.convertToTablet();
+    boolean isAligned1 = container1.isAligned();
     Assert.assertEquals(tablet1, tabletForInsertRowNode);
+    Assert.assertTrue(isAligned1);
 
-    Tablet tablet2 = new TabletInsertionDataContainer(insertTabletNode, 
pattern).convertToTablet();
+    TabletInsertionDataContainer container2 =
+        new TabletInsertionDataContainer(insertTabletNodeAligned, pattern);
+    Tablet tablet2 = container2.convertToTablet();
+    boolean isAligned2 = container2.isAligned();
     Assert.assertEquals(tablet2, tabletForInsertTabletNode);
+    Assert.assertTrue(isAligned2);
 
-    PipeRawTabletInsertionEvent event3 = new 
PipeRawTabletInsertionEvent(tablet1, pattern);
+    PipeRawTabletInsertionEvent event3 = new 
PipeRawTabletInsertionEvent(tablet1, true, pattern);
     Tablet tablet3 = event3.convertToTablet();
+    boolean isAligned3 = event3.isAligned();
     Assert.assertEquals(tablet1, tablet3);
+    Assert.assertTrue(isAligned3);
 
-    PipeRawTabletInsertionEvent event4 = new 
PipeRawTabletInsertionEvent(tablet2, pattern);
+    PipeRawTabletInsertionEvent event4 = new 
PipeRawTabletInsertionEvent(tablet2, true, pattern);
     Tablet tablet4 = event4.convertToTablet();
+    boolean isAligned4 = event4.isAligned();
     Assert.assertEquals(tablet2, tablet4);
+    Assert.assertTrue(isAligned4);
   }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index 8b797abcfff..642a66dc660 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -231,7 +231,7 @@ public class TsFileInsertionDataContainerTest {
                                       tabletInsertionEvent2 ->
                                           tabletInsertionEvent2.processTablet(
                                               (tablet, rowCollector) -> {
-                                                new 
PipeRawTabletInsertionEvent(tablet)
+                                                new 
PipeRawTabletInsertionEvent(tablet, false)
                                                     .processRowByRow(
                                                         (row, collector) -> {
                                                           try {
@@ -256,7 +256,7 @@ public class TsFileInsertionDataContainerTest {
                   event
                       .processTablet(
                           (tablet, rowCollector) -> {
-                            new PipeRawTabletInsertionEvent(tablet)
+                            new PipeRawTabletInsertionEvent(tablet, false)
                                 .processRowByRow(
                                     (row, collector) -> {
                                       try {
@@ -380,7 +380,7 @@ public class TsFileInsertionDataContainerTest {
                                       tabletInsertionEvent2 ->
                                           tabletInsertionEvent2.processTablet(
                                               (tablet, rowCollector) -> {
-                                                new 
PipeRawTabletInsertionEvent(tablet)
+                                                new 
PipeRawTabletInsertionEvent(tablet, false)
                                                     .processRowByRow(
                                                         (row, collector) -> {
                                                           try {
@@ -405,7 +405,7 @@ public class TsFileInsertionDataContainerTest {
                   event
                       .processTablet(
                           (tablet, rowCollector) -> {
-                            new PipeRawTabletInsertionEvent(tablet)
+                            new PipeRawTabletInsertionEvent(tablet, false)
                                 .processRowByRow(
                                     (row, collector) -> {
                                       try {
@@ -494,7 +494,7 @@ public class TsFileInsertionDataContainerTest {
                                       tabletInsertionEvent2 ->
                                           tabletInsertionEvent2.processTablet(
                                               (tablet, rowCollector) -> {
-                                                new 
PipeRawTabletInsertionEvent(tablet)
+                                                new 
PipeRawTabletInsertionEvent(tablet, false)
                                                     .processRowByRow(
                                                         (row, collector) -> {
                                                           try {
@@ -518,7 +518,7 @@ public class TsFileInsertionDataContainerTest {
                   event
                       .processTablet(
                           (tablet, rowCollector) -> {
-                            new PipeRawTabletInsertionEvent(tablet)
+                            new PipeRawTabletInsertionEvent(tablet, false)
                                 .processRowByRow(
                                     (row, collector) -> {
                                       try {
@@ -606,7 +606,7 @@ public class TsFileInsertionDataContainerTest {
                                       tabletInsertionEvent2 ->
                                           tabletInsertionEvent2.processTablet(
                                               (tablet, rowCollector) -> {
-                                                new 
PipeRawTabletInsertionEvent(tablet)
+                                                new 
PipeRawTabletInsertionEvent(tablet, false)
                                                     .processRowByRow(
                                                         (row, collector) -> {
                                                           try {
@@ -630,7 +630,7 @@ public class TsFileInsertionDataContainerTest {
                   event
                       .processTablet(
                           (tablet, rowCollector) -> {
-                            new PipeRawTabletInsertionEvent(tablet)
+                            new PipeRawTabletInsertionEvent(tablet, false)
                                 .processRowByRow(
                                     (row, collector) -> {
                                       try {


Reply via email to