This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new 545dadf8176 [IOTDB-5990][IOTDB-5991] Pipe: support transferring
aligned tablets (#10137) (#10202)
545dadf8176 is described below
commit 545dadf8176d4e5927855b45351b959614bf2ea1
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sun Jun 18 07:20:14 2023 +0800
[IOTDB-5990][IOTDB-5991] Pipe: support transferring aligned tablets
(#10137) (#10202)
(cherry picked from commit 25b70f8b884294ccccdd7710d5e39f382102ab4f)
Co-authored-by: Steve Yurong Su <[email protected]>
Co-authored-by: 马子坤 <[email protected]>
---
.../pipe/connector/legacy/IoTDBSyncConnector.java | 15 ++++-
.../pipe/connector/v1/IoTDBThriftConnectorV1.java | 3 +-
.../v1/request/PipeTransferTabletReq.java | 48 +++++++++-----
.../pipe/connector/v2/IoTDBThriftConnectorV2.java | 4 +-
.../iotdb/db/pipe/event/common/row/PipeRow.java | 7 ++
.../db/pipe/event/common/row/PipeRowCollector.java | 4 +-
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 19 ++++--
.../common/tablet/PipeRawTabletInsertionEvent.java | 20 ++++--
.../tablet/TabletInsertionDataContainer.java | 17 +++--
.../tsfile/TsFileInsertionDataContainer.java | 21 +++++-
.../realtime/PipeRealtimeCollectEventFactory.java | 3 +-
.../pipe/event/PipeTabletInsertionEventTest.java | 76 ++++++++++++++++++++--
.../event/TsFileInsertionDataContainerTest.java | 16 ++---
13 files changed, 203 insertions(+), 50 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java
index b4f48208c72..ecb03c87a7f 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java
@@ -46,6 +46,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.commons.lang.NotImplementedException;
import org.apache.thrift.TException;
@@ -183,12 +184,22 @@ public class IoTDBSyncConnector implements PipeConnector {
private void doTransfer(PipeInsertNodeTabletInsertionEvent
pipeInsertNodeInsertionEvent)
throws IoTDBConnectionException, StatementExecutionException {
- sessionPool.insertTablet(pipeInsertNodeInsertionEvent.convertToTablet());
+ final Tablet tablet = pipeInsertNodeInsertionEvent.convertToTablet();
+ if (pipeInsertNodeInsertionEvent.isAligned()) {
+ sessionPool.insertAlignedTablet(tablet);
+ } else {
+ sessionPool.insertTablet(tablet);
+ }
}
private void doTransfer(PipeRawTabletInsertionEvent pipeTabletInsertionEvent)
throws PipeException, TException, IoTDBConnectionException,
StatementExecutionException {
- sessionPool.insertTablet(pipeTabletInsertionEvent.convertToTablet());
+ final Tablet tablet = pipeTabletInsertionEvent.convertToTablet();
+ if (pipeTabletInsertionEvent.isAligned()) {
+ sessionPool.insertAlignedTablet(tablet);
+ } else {
+ sessionPool.insertTablet(tablet);
+ }
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
index e863bad5dd9..f0c4d54d929 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
@@ -162,7 +162,8 @@ public class IoTDBThriftConnectorV1 implements
PipeConnector {
final TPipeTransferResp resp =
client.pipeTransfer(
PipeTransferTabletReq.toTPipeTransferReq(
- pipeRawTabletInsertionEvent.convertToTablet()));
+ pipeRawTabletInsertionEvent.convertToTablet(),
+ pipeRawTabletInsertionEvent.isAligned()));
if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java
index ead686a6acf..f69fbee9695 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java
@@ -32,6 +32,8 @@ import
org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -39,22 +41,35 @@ import
org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.DataOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
public class PipeTransferTabletReq extends TPipeTransferReq {
+
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTransferTabletReq.class);
+
private Tablet tablet;
+ private boolean isAligned;
- public static PipeTransferTabletReq toTPipeTransferReq(Tablet tablet) throws
IOException {
+ public static PipeTransferTabletReq toTPipeTransferReq(Tablet tablet,
boolean isAligned)
+ throws IOException {
final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq();
tabletReq.tablet = tablet;
tabletReq.version =
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion();
tabletReq.type = PipeRequestType.TRANSFER_TABLET.getType();
- tabletReq.body = tablet.serialize();
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ tablet.serialize(outputStream);
+ ReadWriteIOUtils.write(isAligned, outputStream);
+ tabletReq.body =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+
return tabletReq;
}
@@ -67,7 +82,7 @@ public class PipeTransferTabletReq extends TPipeTransferReq {
return true;
}
- public static void sortTablet(Tablet tablet) {
+ private static void sortTablet(Tablet tablet) {
/*
* following part of code sort the batch data by time,
* so we can insert continuous data in value list to get a better
performance
@@ -180,6 +195,19 @@ public class PipeTransferTabletReq extends
TPipeTransferReq {
return sortedBitMap;
}
+ public static PipeTransferTabletReq fromTPipeTransferReq(TPipeTransferReq
transferReq) {
+ final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq();
+
+ tabletReq.tablet = Tablet.deserialize(transferReq.body);
+ tabletReq.isAligned = ReadWriteIOUtils.readBool(transferReq.body);
+
+ tabletReq.version = transferReq.version;
+ tabletReq.type = transferReq.type;
+ tabletReq.body = transferReq.body;
+
+ return tabletReq;
+ }
+
public InsertTabletStatement constructStatement() {
if (!checkSorted(tablet)) {
sortTablet(tablet);
@@ -194,7 +222,7 @@ public class PipeTransferTabletReq extends TPipeTransferReq
{
}
request.setPrefixPath(tablet.deviceId);
- request.setIsAligned(false);
+ request.setIsAligned(isAligned);
request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
request.setValues(SessionUtils.getValueBuffer(tablet));
request.setSize(tablet.rowSize);
@@ -207,16 +235,4 @@ public class PipeTransferTabletReq extends
TPipeTransferReq {
return null;
}
}
-
- public static PipeTransferTabletReq fromTPipeTransferReq(TPipeTransferReq
transferReq) {
- final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq();
-
- tabletReq.tablet = Tablet.deserialize(transferReq.body);
-
- tabletReq.version = transferReq.version;
- tabletReq.type = transferReq.type;
- tabletReq.body = transferReq.body;
-
- return tabletReq;
- }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
index e22d2620cc9..8dffd1d544c 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
@@ -182,7 +182,9 @@ public class IoTDBThriftConnectorV2 implements
PipeConnector {
final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
(PipeRawTabletInsertionEvent) tabletInsertionEvent;
final PipeTransferTabletReq pipeTransferTabletReq =
-
PipeTransferTabletReq.toTPipeTransferReq(pipeRawTabletInsertionEvent.convertToTablet());
+ PipeTransferTabletReq.toTPipeTransferReq(
+ pipeRawTabletInsertionEvent.convertToTablet(),
+ pipeRawTabletInsertionEvent.isAligned());
final PipeTransferRawTabletInsertionEventHandler
pipeTransferTabletReqHandler =
new PipeTransferRawTabletInsertionEventHandler(
requestCommitId, pipeTransferTabletReq, this);
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
index 24ad5e108fc..f259b84a2e7 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
@@ -35,6 +35,7 @@ public class PipeRow implements Row {
private final int rowIndex;
private final String deviceId;
+ private final boolean isAligned;
private final MeasurementSchema[] measurementSchemaList;
private final long[] timestampColumn;
@@ -47,6 +48,7 @@ public class PipeRow implements Row {
public PipeRow(
int rowIndex,
String deviceId,
+ boolean isAligned,
MeasurementSchema[] measurementSchemaList,
long[] timestampColumn,
TSDataType[] valueColumnTypes,
@@ -55,6 +57,7 @@ public class PipeRow implements Row {
String[] columnNameStringList) {
this.rowIndex = rowIndex;
this.deviceId = deviceId;
+ this.isAligned = isAligned;
this.measurementSchemaList = measurementSchemaList;
this.timestampColumn = timestampColumn;
this.valueColumnTypes = valueColumnTypes;
@@ -165,6 +168,10 @@ public class PipeRow implements Row {
return deviceId;
}
+ public boolean isAligned() {
+ return isAligned;
+ }
+
public MeasurementSchema[] getMeasurementSchemaList() {
return measurementSchemaList;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
index ea989d48027..ac0cbc6e96f 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
@@ -35,6 +35,7 @@ public class PipeRowCollector implements RowCollector {
private final List<TabletInsertionEvent> tabletInsertionEventList = new
ArrayList<>();
private Tablet tablet = null;
+ private boolean isAligned = false;
@Override
public void collectRow(Row row) {
@@ -50,6 +51,7 @@ public class PipeRowCollector implements RowCollector {
final List<MeasurementSchema> measurementSchemaList =
new ArrayList<>(Arrays.asList(measurementSchemaArray));
tablet = new Tablet(deviceId, measurementSchemaList);
+ isAligned = pipeRow.isAligned();
tablet.initBitMaps();
}
@@ -78,7 +80,7 @@ public class PipeRowCollector implements RowCollector {
private void collectTabletInsertionEvent() {
if (tablet != null) {
- tabletInsertionEventList.add(new PipeRawTabletInsertionEvent(tablet));
+ tabletInsertionEventList.add(new PipeRawTabletInsertionEvent(tablet,
isAligned));
}
this.tablet = null;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 5623d7715a6..0933e1a3c71 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -45,22 +45,25 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
private final WALEntryHandler walEntryHandler;
private final ProgressIndex progressIndex;
+ private final boolean isAligned;
private TabletInsertionDataContainer dataContainer;
public PipeInsertNodeTabletInsertionEvent(
- WALEntryHandler walEntryHandler, ProgressIndex progressIndex) {
- this(walEntryHandler, progressIndex, null, null);
+ WALEntryHandler walEntryHandler, ProgressIndex progressIndex, boolean
isAligned) {
+ this(walEntryHandler, progressIndex, isAligned, null, null);
}
private PipeInsertNodeTabletInsertionEvent(
WALEntryHandler walEntryHandler,
ProgressIndex progressIndex,
+ boolean isAligned,
PipeTaskMeta pipeTaskMeta,
String pattern) {
super(pipeTaskMeta, pattern);
this.walEntryHandler = walEntryHandler;
this.progressIndex = progressIndex;
+ this.isAligned = isAligned;
}
public InsertNode getInsertNode() throws WALPipeException {
@@ -108,7 +111,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
public PipeInsertNodeTabletInsertionEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
PipeTaskMeta pipeTaskMeta, String pattern) {
return new PipeInsertNodeTabletInsertionEvent(
- walEntryHandler, progressIndex, pipeTaskMeta, pattern);
+ walEntryHandler, progressIndex, isAligned, pipeTaskMeta, pattern);
}
/////////////////////////// TabletInsertionEvent ///////////////////////////
@@ -139,6 +142,12 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
}
}
+ /////////////////////////// convertToTablet ///////////////////////////
+
+ public boolean isAligned() {
+ return isAligned;
+ }
+
public Tablet convertToTablet() {
try {
if (dataContainer == null) {
@@ -155,11 +164,13 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
@Override
public String toString() {
- return "PipeRawTabletInsertionEvent{"
+ return "PipeInsertNodeTabletInsertionEvent{"
+ "walEntryHandler="
+ walEntryHandler
+ ", progressIndex="
+ progressIndex
+ + ", isAligned="
+ + isAligned
+ '}';
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index c8bc2fd55ab..594c0a38291 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -31,16 +31,18 @@ import java.util.function.BiConsumer;
public class PipeRawTabletInsertionEvent implements TabletInsertionEvent {
private final Tablet tablet;
+ private final boolean isAligned;
private final String pattern;
private TabletInsertionDataContainer dataContainer;
- public PipeRawTabletInsertionEvent(Tablet tablet) {
- this(Objects.requireNonNull(tablet), null);
+ public PipeRawTabletInsertionEvent(Tablet tablet, boolean isAligned) {
+ this(tablet, isAligned, null);
}
- public PipeRawTabletInsertionEvent(Tablet tablet, String pattern) {
+ public PipeRawTabletInsertionEvent(Tablet tablet, boolean isAligned, String
pattern) {
this.tablet = Objects.requireNonNull(tablet);
+ this.isAligned = isAligned;
this.pattern = pattern;
}
@@ -53,7 +55,7 @@ public class PipeRawTabletInsertionEvent implements
TabletInsertionEvent {
@Override
public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row,
RowCollector> consumer) {
if (dataContainer == null) {
- dataContainer = new TabletInsertionDataContainer(tablet, getPattern());
+ dataContainer = new TabletInsertionDataContainer(tablet, isAligned,
getPattern());
}
return dataContainer.processRowByRow(consumer);
}
@@ -61,11 +63,17 @@ public class PipeRawTabletInsertionEvent implements
TabletInsertionEvent {
@Override
public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet,
RowCollector> consumer) {
if (dataContainer == null) {
- dataContainer = new TabletInsertionDataContainer(tablet, getPattern());
+ dataContainer = new TabletInsertionDataContainer(tablet, isAligned,
getPattern());
}
return dataContainer.processTablet(consumer);
}
+ /////////////////////////// convertToTablet ///////////////////////////
+
+ public boolean isAligned() {
+ return isAligned;
+ }
+
public Tablet convertToTablet() {
final String pattern = getPattern();
@@ -76,7 +84,7 @@ public class PipeRawTabletInsertionEvent implements
TabletInsertionEvent {
// if pattern is not "root", we need to convert the tablet
if (dataContainer == null) {
- dataContainer = new TabletInsertionDataContainer(tablet, pattern);
+ dataContainer = new TabletInsertionDataContainer(tablet, isAligned,
pattern);
}
return dataContainer.convertToTablet();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
index 49275556cd2..86fbbd28f6e 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
@@ -48,6 +48,7 @@ import java.util.stream.IntStream;
public class TabletInsertionDataContainer {
private String deviceId;
+ private boolean isAligned;
private MeasurementSchema[] measurementSchemaList;
private String[] columnNameStringList;
@@ -76,8 +77,8 @@ public class TabletInsertionDataContainer {
}
}
- public TabletInsertionDataContainer(Tablet tablet, String pattern) {
- parse(tablet, pattern);
+ public TabletInsertionDataContainer(Tablet tablet, boolean isAligned, String
pattern) {
+ parse(tablet, isAligned, pattern);
}
//////////////////////////// parse ////////////////////////////
@@ -87,6 +88,7 @@ public class TabletInsertionDataContainer {
final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new
Integer[originColumnSize];
this.deviceId = insertRowNode.getDevicePath().getFullPath();
+ this.isAligned = insertRowNode.isAligned();
this.timestampColumn = new long[] {insertRowNode.getTime()};
generateColumnIndexMapper(
@@ -153,6 +155,7 @@ public class TabletInsertionDataContainer {
final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new
Integer[originColumnSize];
this.deviceId = insertTabletNode.getDevicePath().getFullPath();
+ this.isAligned = insertTabletNode.isAligned();
this.timestampColumn = insertTabletNode.getTimes();
generateColumnIndexMapper(
@@ -204,11 +207,12 @@ public class TabletInsertionDataContainer {
rowCount = timestampColumn.length;
}
- private void parse(Tablet tablet, String pattern) {
+ private void parse(Tablet tablet, boolean isAligned, String pattern) {
final int originColumnSize = tablet.getSchemas().size();
final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new
Integer[originColumnSize];
this.deviceId = tablet.deviceId;
+ this.isAligned = isAligned;
this.timestampColumn = tablet.timestamps;
final List<MeasurementSchema> originMeasurementSchemaList =
tablet.getSchemas();
@@ -311,6 +315,7 @@ public class TabletInsertionDataContainer {
new PipeRow(
i,
deviceId,
+ isAligned,
measurementSchemaList,
timestampColumn,
valueColumnTypes,
@@ -328,7 +333,11 @@ public class TabletInsertionDataContainer {
return rowCollector.convertToTabletInsertionEvents();
}
- //////////////////////////// convert ////////////////////////////
+ //////////////////////////// convertToTablet ////////////////////////////
+
+ public boolean isAligned() {
+ return isAligned;
+ }
public Tablet convertToTablet() {
if (tablet != null) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index e9347d31202..a9a5f657d83 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -24,12 +24,15 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TsFileDeviceIterator;
import org.apache.iotdb.tsfile.read.TsFileReader;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.record.Tablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +58,7 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
private final TsFileReader tsFileReader;
private final Iterator<Map.Entry<String, List<String>>>
deviceMeasurementsMapIterator;
+ private final Map<String, Boolean> deviceIsAlignedMap;
private final Map<String, TSDataType> measurementDataTypeMap;
public TsFileInsertionDataContainer(File tsFile, String pattern, long
startTime, long endTime)
@@ -72,6 +76,7 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
tsFileReader = new TsFileReader(tsFileSequenceReader);
deviceMeasurementsMapIterator =
filterDeviceMeasurementsMapByPattern().entrySet().iterator();
+ deviceIsAlignedMap = readDeviceIsAlignedMap();
measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap();
} catch (Exception e) {
LOGGER.error("failed to create TsFileInsertionDataContainer", e);
@@ -119,6 +124,17 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
return filteredDeviceMeasurementsMap;
}
+ private Map<String, Boolean> readDeviceIsAlignedMap() throws IOException {
+ final Map<String, Boolean> deviceIsAlignedMap = new HashMap<>();
+ final TsFileDeviceIterator deviceIsAlignedIterator =
+ tsFileSequenceReader.getAllDevicesIteratorWithIsAligned();
+ while (deviceIsAlignedIterator.hasNext()) {
+ final Pair<String, Boolean> deviceIsAlignedPair =
deviceIsAlignedIterator.next();
+ deviceIsAlignedMap.put(deviceIsAlignedPair.getLeft(),
deviceIsAlignedPair.getRight());
+ }
+ return deviceIsAlignedMap;
+ }
+
/** @return TabletInsertionEvent in a streaming way */
public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
return () ->
@@ -159,8 +175,9 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
throw new NoSuchElementException();
}
- final TabletInsertionEvent next =
- new PipeRawTabletInsertionEvent(tabletIterator.next());
+ final Tablet tablet = tabletIterator.next();
+ final boolean isAligned =
deviceIsAlignedMap.getOrDefault(tablet.deviceId, false);
+ final TabletInsertionEvent next = new
PipeRawTabletInsertionEvent(tablet, isAligned);
if (!hasNext()) {
close();
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEventFactory.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEventFactory.java
index 274553301f3..2b2be4abc21 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEventFactory.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEventFactory.java
@@ -38,7 +38,8 @@ public class PipeRealtimeCollectEventFactory {
public static PipeRealtimeCollectEvent createCollectEvent(
WALEntryHandler walEntryHandler, InsertNode insertNode, TsFileResource
resource) {
return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent(
- new PipeInsertNodeTabletInsertionEvent(walEntryHandler,
insertNode.getProgressIndex()),
+ new PipeInsertNodeTabletInsertionEvent(
+ walEntryHandler, insertNode.getProgressIndex(),
insertNode.isAligned()),
insertNode,
resource);
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
b/server/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
index abd0cf9b056..00d44e8ade2 100644
---
a/server/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
@@ -41,7 +41,9 @@ import java.util.Arrays;
public class PipeTabletInsertionEventTest {
private InsertRowNode insertRowNode;
+ private InsertRowNode insertRowNodeAligned;
private InsertTabletNode insertTabletNode;
+ private InsertTabletNode insertTabletNodeAligned;
final String deviceId = "root.sg.d1";
final long[] times = new long[] {110L, 111L, 112L, 113L, 114L};
@@ -98,6 +100,18 @@ public class PipeTabletInsertionEventTest {
times[0],
values,
false);
+
+ insertRowNodeAligned =
+ new InsertRowNode(
+ new PlanNodeId("plan node 1"),
+ new PartialPath(deviceId),
+ true,
+ measurementIds,
+ dataTypes,
+ schemas,
+ times[0],
+ values,
+ false);
}
private void createInsertTabletNode() throws IllegalPathException {
@@ -131,6 +145,19 @@ public class PipeTabletInsertionEventTest {
null,
values,
times.length);
+
+ this.insertTabletNodeAligned =
+ new InsertTabletNode(
+ new PlanNodeId("plannode 1"),
+ new PartialPath(deviceId),
+ true,
+ measurementIds,
+ dataTypes,
+ schemas,
+ times,
+ null,
+ values,
+ times.length);
}
private void createTablet() {
@@ -194,18 +221,59 @@ public class PipeTabletInsertionEventTest {
@Test
public void convertToTabletForTest() {
- Tablet tablet1 = new TabletInsertionDataContainer(insertRowNode,
pattern).convertToTablet();
+ TabletInsertionDataContainer container1 =
+ new TabletInsertionDataContainer(insertRowNode, pattern);
+ Tablet tablet1 = container1.convertToTablet();
+ boolean isAligned1 = container1.isAligned();
+ Assert.assertEquals(tablet1, tabletForInsertRowNode);
+ Assert.assertFalse(isAligned1);
+
+ TabletInsertionDataContainer container2 =
+ new TabletInsertionDataContainer(insertTabletNode, pattern);
+ Tablet tablet2 = container2.convertToTablet();
+ boolean isAligned2 = container2.isAligned();
+ Assert.assertEquals(tablet2, tabletForInsertTabletNode);
+ Assert.assertFalse(isAligned2);
+
+ PipeRawTabletInsertionEvent event3 = new
PipeRawTabletInsertionEvent(tablet1, false, pattern);
+ Tablet tablet3 = event3.convertToTablet();
+ boolean isAligned3 = event3.isAligned();
+ Assert.assertEquals(tablet1, tablet3);
+ Assert.assertFalse(isAligned3);
+
+ PipeRawTabletInsertionEvent event4 = new
PipeRawTabletInsertionEvent(tablet2, false, pattern);
+ Tablet tablet4 = event4.convertToTablet();
+ boolean isAligned4 = event4.isAligned();
+ Assert.assertEquals(tablet2, tablet4);
+ Assert.assertFalse(isAligned4);
+ }
+
+ @Test
+ public void convertToAlignedTabletForTest() {
+ TabletInsertionDataContainer container1 =
+ new TabletInsertionDataContainer(insertRowNodeAligned, pattern);
+ Tablet tablet1 = container1.convertToTablet();
+ boolean isAligned1 = container1.isAligned();
Assert.assertEquals(tablet1, tabletForInsertRowNode);
+ Assert.assertTrue(isAligned1);
- Tablet tablet2 = new TabletInsertionDataContainer(insertTabletNode,
pattern).convertToTablet();
+ TabletInsertionDataContainer container2 =
+ new TabletInsertionDataContainer(insertTabletNodeAligned, pattern);
+ Tablet tablet2 = container2.convertToTablet();
+ boolean isAligned2 = container2.isAligned();
Assert.assertEquals(tablet2, tabletForInsertTabletNode);
+ Assert.assertTrue(isAligned2);
- PipeRawTabletInsertionEvent event3 = new
PipeRawTabletInsertionEvent(tablet1, pattern);
+ PipeRawTabletInsertionEvent event3 = new
PipeRawTabletInsertionEvent(tablet1, true, pattern);
Tablet tablet3 = event3.convertToTablet();
+ boolean isAligned3 = event3.isAligned();
Assert.assertEquals(tablet1, tablet3);
+ Assert.assertTrue(isAligned3);
- PipeRawTabletInsertionEvent event4 = new
PipeRawTabletInsertionEvent(tablet2, pattern);
+ PipeRawTabletInsertionEvent event4 = new
PipeRawTabletInsertionEvent(tablet2, true, pattern);
Tablet tablet4 = event4.convertToTablet();
+ boolean isAligned4 = event4.isAligned();
Assert.assertEquals(tablet2, tablet4);
+ Assert.assertTrue(isAligned4);
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
b/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index 8b797abcfff..642a66dc660 100644
---
a/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -231,7 +231,7 @@ public class TsFileInsertionDataContainerTest {
tabletInsertionEvent2 ->
tabletInsertionEvent2.processTablet(
(tablet, rowCollector) -> {
- new
PipeRawTabletInsertionEvent(tablet)
+ new
PipeRawTabletInsertionEvent(tablet, false)
.processRowByRow(
(row, collector) -> {
try {
@@ -256,7 +256,7 @@ public class TsFileInsertionDataContainerTest {
event
.processTablet(
(tablet, rowCollector) -> {
- new PipeRawTabletInsertionEvent(tablet)
+ new PipeRawTabletInsertionEvent(tablet, false)
.processRowByRow(
(row, collector) -> {
try {
@@ -380,7 +380,7 @@ public class TsFileInsertionDataContainerTest {
tabletInsertionEvent2 ->
tabletInsertionEvent2.processTablet(
(tablet, rowCollector) -> {
- new
PipeRawTabletInsertionEvent(tablet)
+ new
PipeRawTabletInsertionEvent(tablet, false)
.processRowByRow(
(row, collector) -> {
try {
@@ -405,7 +405,7 @@ public class TsFileInsertionDataContainerTest {
event
.processTablet(
(tablet, rowCollector) -> {
- new PipeRawTabletInsertionEvent(tablet)
+ new PipeRawTabletInsertionEvent(tablet, false)
.processRowByRow(
(row, collector) -> {
try {
@@ -494,7 +494,7 @@ public class TsFileInsertionDataContainerTest {
tabletInsertionEvent2 ->
tabletInsertionEvent2.processTablet(
(tablet, rowCollector) -> {
- new
PipeRawTabletInsertionEvent(tablet)
+ new
PipeRawTabletInsertionEvent(tablet, false)
.processRowByRow(
(row, collector) -> {
try {
@@ -518,7 +518,7 @@ public class TsFileInsertionDataContainerTest {
event
.processTablet(
(tablet, rowCollector) -> {
- new PipeRawTabletInsertionEvent(tablet)
+ new PipeRawTabletInsertionEvent(tablet, false)
.processRowByRow(
(row, collector) -> {
try {
@@ -606,7 +606,7 @@ public class TsFileInsertionDataContainerTest {
tabletInsertionEvent2 ->
tabletInsertionEvent2.processTablet(
(tablet, rowCollector) -> {
- new
PipeRawTabletInsertionEvent(tablet)
+ new
PipeRawTabletInsertionEvent(tablet, false)
.processRowByRow(
(row, collector) -> {
try {
@@ -630,7 +630,7 @@ public class TsFileInsertionDataContainerTest {
event
.processTablet(
(tablet, rowCollector) -> {
- new PipeRawTabletInsertionEvent(tablet)
+ new PipeRawTabletInsertionEvent(tablet, false)
.processRowByRow(
(row, collector) -> {
try {