This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch IOTDB-5990-0.12 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ca949b3904677250d32d2d3e85994d1fd07dff44 Author: 马子坤 <[email protected]> AuthorDate: Sun Jun 18 06:45:53 2023 +0800 [IOTDB-5990][IOTDB-5991] Pipe: support transferring aligned tablets (#10137) Co-authored-by: Steve Yurong Su <[email protected]> (cherry picked from commit 25b70f8b884294ccccdd7710d5e39f382102ab4f) --- .../pipe/connector/legacy/IoTDBSyncConnector.java | 15 ++++- .../pipe/connector/v1/IoTDBThriftConnectorV1.java | 3 +- .../v1/request/PipeTransferTabletReq.java | 48 +++++++++----- .../pipe/connector/v2/IoTDBThriftConnectorV2.java | 7 +- .../iotdb/db/pipe/event/common/row/PipeRow.java | 7 ++ .../db/pipe/event/common/row/PipeRowCollector.java | 4 +- .../tablet/PipeInsertNodeTabletInsertionEvent.java | 19 ++++-- .../common/tablet/PipeRawTabletInsertionEvent.java | 20 ++++-- .../tablet/TabletInsertionDataContainer.java | 17 +++-- .../tsfile/TsFileInsertionDataContainer.java | 21 +++++- .../realtime/PipeRealtimeCollectEventFactory.java | 3 +- .../pipe/event/PipeTabletInsertionEventTest.java | 76 ++++++++++++++++++++-- .../event/TsFileInsertionDataContainerTest.java | 16 ++--- 13 files changed, 203 insertions(+), 53 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java index b4f48208c72..ecb03c87a7f 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java @@ -46,6 +46,7 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo; import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo; import org.apache.iotdb.session.pool.SessionPool; +import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.commons.lang.NotImplementedException; import org.apache.thrift.TException; @@ -183,12 +184,22 @@ public class IoTDBSyncConnector implements PipeConnector { private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeInsertionEvent) throws IoTDBConnectionException, StatementExecutionException { - sessionPool.insertTablet(pipeInsertNodeInsertionEvent.convertToTablet()); + final Tablet tablet = pipeInsertNodeInsertionEvent.convertToTablet(); + if (pipeInsertNodeInsertionEvent.isAligned()) { + sessionPool.insertAlignedTablet(tablet); + } else { + sessionPool.insertTablet(tablet); + } } private void doTransfer(PipeRawTabletInsertionEvent pipeTabletInsertionEvent) throws PipeException, TException, IoTDBConnectionException, StatementExecutionException { - sessionPool.insertTablet(pipeTabletInsertionEvent.convertToTablet()); + final Tablet tablet = pipeTabletInsertionEvent.convertToTablet(); + if (pipeTabletInsertionEvent.isAligned()) { + sessionPool.insertAlignedTablet(tablet); + } else { + sessionPool.insertTablet(tablet); + } } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java index e863bad5dd9..f0c4d54d929 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java @@ -162,7 +162,8 @@ public class IoTDBThriftConnectorV1 implements PipeConnector { final TPipeTransferResp resp = client.pipeTransfer( PipeTransferTabletReq.toTPipeTransferReq( - pipeRawTabletInsertionEvent.convertToTablet())); + pipeRawTabletInsertionEvent.convertToTablet(), + pipeRawTabletInsertionEvent.isAligned())); if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { throw new PipeException( diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java index ead686a6acf..f69fbee9695 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java @@ -32,6 +32,8 @@ import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.BitMap; +import org.apache.iotdb.tsfile.utils.PublicBAOS; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; @@ -39,22 +41,35 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Comparator; public class PipeTransferTabletReq extends TPipeTransferReq { + private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferTabletReq.class); + private Tablet tablet; + private boolean isAligned; - public static PipeTransferTabletReq toTPipeTransferReq(Tablet tablet) throws IOException { + public static PipeTransferTabletReq toTPipeTransferReq(Tablet tablet, boolean isAligned) + throws IOException { final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq(); tabletReq.tablet = tablet; tabletReq.version = IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion(); tabletReq.type = PipeRequestType.TRANSFER_TABLET.getType(); - tabletReq.body = tablet.serialize(); + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + tablet.serialize(outputStream); + ReadWriteIOUtils.write(isAligned, outputStream); + tabletReq.body = + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + return tabletReq; } @@ -67,7 +82,7 @@ public class PipeTransferTabletReq extends TPipeTransferReq { return true; } - public static void sortTablet(Tablet tablet) { + private static void sortTablet(Tablet tablet) { /* * following part of code sort the batch data by time, * so we can insert continuous data in value list to get a better performance @@ -180,6 +195,19 @@ public class PipeTransferTabletReq extends TPipeTransferReq { return sortedBitMap; } + public static PipeTransferTabletReq fromTPipeTransferReq(TPipeTransferReq transferReq) { + final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq(); + + tabletReq.tablet = Tablet.deserialize(transferReq.body); + tabletReq.isAligned = ReadWriteIOUtils.readBool(transferReq.body); + + tabletReq.version = transferReq.version; + tabletReq.type = transferReq.type; + tabletReq.body = transferReq.body; + + return tabletReq; + } + public InsertTabletStatement constructStatement() { if (!checkSorted(tablet)) { sortTablet(tablet); @@ -194,7 +222,7 @@ public class PipeTransferTabletReq extends TPipeTransferReq { } request.setPrefixPath(tablet.deviceId); - request.setIsAligned(false); + request.setIsAligned(isAligned); request.setTimestamps(SessionUtils.getTimeBuffer(tablet)); request.setValues(SessionUtils.getValueBuffer(tablet)); request.setSize(tablet.rowSize); @@ -207,16 +235,4 @@ public class PipeTransferTabletReq extends TPipeTransferReq { return null; } } - - public static PipeTransferTabletReq fromTPipeTransferReq(TPipeTransferReq transferReq) { - final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq(); - - tabletReq.tablet = Tablet.deserialize(transferReq.body); - - tabletReq.version = transferReq.version; - tabletReq.type = transferReq.type; - tabletReq.body = transferReq.body; - - return tabletReq; - } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java index e22d2620cc9..d5735aa95e4 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java @@ -26,8 +26,6 @@ import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; import org.apache.iotdb.commons.client.property.ThriftClientProperty; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; -import org.apache.iotdb.db.conf.IoTDBConfig; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.connector.v1.IoTDBThriftConnectorClient; import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferHandshakeReq; import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferInsertNodeReq; @@ -75,7 +73,6 @@ public class IoTDBThriftConnectorV2 implements PipeConnector { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBThriftConnectorV2.class); private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); - private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig(); private static final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER = @@ -182,7 +179,9 @@ public class IoTDBThriftConnectorV2 implements PipeConnector { final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent = (PipeRawTabletInsertionEvent) tabletInsertionEvent; final PipeTransferTabletReq pipeTransferTabletReq = - PipeTransferTabletReq.toTPipeTransferReq(pipeRawTabletInsertionEvent.convertToTablet()); + PipeTransferTabletReq.toTPipeTransferReq( + pipeRawTabletInsertionEvent.convertToTablet(), + pipeRawTabletInsertionEvent.isAligned()); final PipeTransferRawTabletInsertionEventHandler pipeTransferTabletReqHandler = new PipeTransferRawTabletInsertionEventHandler( requestCommitId, pipeTransferTabletReq, this); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java index 24ad5e108fc..f259b84a2e7 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java @@ -35,6 +35,7 @@ public class PipeRow implements Row { private final int rowIndex; private final String deviceId; + private final boolean isAligned; private final MeasurementSchema[] measurementSchemaList; private final long[] timestampColumn; @@ -47,6 +48,7 @@ public class PipeRow implements Row { public PipeRow( int rowIndex, String deviceId, + boolean isAligned, MeasurementSchema[] measurementSchemaList, long[] timestampColumn, TSDataType[] valueColumnTypes, @@ -55,6 +57,7 @@ public class PipeRow implements Row { String[] columnNameStringList) { this.rowIndex = rowIndex; this.deviceId = deviceId; + this.isAligned = isAligned; this.measurementSchemaList = measurementSchemaList; this.timestampColumn = timestampColumn; this.valueColumnTypes = valueColumnTypes; @@ -165,6 +168,10 @@ public class PipeRow implements Row { return deviceId; } + public boolean isAligned() { + return isAligned; + } + public MeasurementSchema[] getMeasurementSchemaList() { return measurementSchemaList; } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java index ea989d48027..ac0cbc6e96f 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java @@ -35,6 +35,7 @@ public class PipeRowCollector implements RowCollector { private final List<TabletInsertionEvent> tabletInsertionEventList = new ArrayList<>(); private Tablet tablet = null; + private boolean isAligned = false; @Override public void collectRow(Row row) { @@ -50,6 +51,7 @@ public class PipeRowCollector implements RowCollector { final List<MeasurementSchema> measurementSchemaList = new ArrayList<>(Arrays.asList(measurementSchemaArray)); tablet = new Tablet(deviceId, measurementSchemaList); + isAligned = pipeRow.isAligned(); tablet.initBitMaps(); } @@ -78,7 +80,7 @@ public class PipeRowCollector implements RowCollector { private void collectTabletInsertionEvent() { if (tablet != null) { - tabletInsertionEventList.add(new PipeRawTabletInsertionEvent(tablet)); + tabletInsertionEventList.add(new PipeRawTabletInsertionEvent(tablet, isAligned)); } this.tablet = null; } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 5623d7715a6..0933e1a3c71 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -45,22 +45,25 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent private final WALEntryHandler walEntryHandler; private final ProgressIndex progressIndex; + private final boolean isAligned; private TabletInsertionDataContainer dataContainer; public PipeInsertNodeTabletInsertionEvent( - WALEntryHandler walEntryHandler, ProgressIndex progressIndex) { - this(walEntryHandler, progressIndex, null, null); + WALEntryHandler walEntryHandler, ProgressIndex progressIndex, boolean isAligned) { + this(walEntryHandler, progressIndex, isAligned, null, null); } private PipeInsertNodeTabletInsertionEvent( WALEntryHandler walEntryHandler, ProgressIndex progressIndex, + boolean isAligned, PipeTaskMeta pipeTaskMeta, String pattern) { super(pipeTaskMeta, pattern); this.walEntryHandler = walEntryHandler; this.progressIndex = progressIndex; + this.isAligned = isAligned; } public InsertNode getInsertNode() throws WALPipeException { @@ -108,7 +111,7 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( PipeTaskMeta pipeTaskMeta, String pattern) { return new PipeInsertNodeTabletInsertionEvent( - walEntryHandler, progressIndex, pipeTaskMeta, pattern); + walEntryHandler, progressIndex, isAligned, pipeTaskMeta, pattern); } /////////////////////////// TabletInsertionEvent /////////////////////////// @@ -139,6 +142,12 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent } } + /////////////////////////// convertToTablet /////////////////////////// + + public boolean isAligned() { + return isAligned; + } + public Tablet convertToTablet() { try { if (dataContainer == null) { @@ -155,11 +164,13 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent @Override public String toString() { - return "PipeRawTabletInsertionEvent{" + return "PipeInsertNodeTabletInsertionEvent{" + "walEntryHandler=" + walEntryHandler + ", progressIndex=" + progressIndex + + ", isAligned=" + + isAligned + '}'; } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index c8bc2fd55ab..594c0a38291 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -31,16 +31,18 @@ import java.util.function.BiConsumer; public class PipeRawTabletInsertionEvent implements TabletInsertionEvent { private final Tablet tablet; + private final boolean isAligned; private final String pattern; private TabletInsertionDataContainer dataContainer; - public PipeRawTabletInsertionEvent(Tablet tablet) { - this(Objects.requireNonNull(tablet), null); + public PipeRawTabletInsertionEvent(Tablet tablet, boolean isAligned) { + this(tablet, isAligned, null); } - public PipeRawTabletInsertionEvent(Tablet tablet, String pattern) { + public PipeRawTabletInsertionEvent(Tablet tablet, boolean isAligned, String pattern) { this.tablet = Objects.requireNonNull(tablet); + this.isAligned = isAligned; this.pattern = pattern; } @@ -53,7 +55,7 @@ public class PipeRawTabletInsertionEvent implements TabletInsertionEvent { @Override public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, RowCollector> consumer) { if (dataContainer == null) { - dataContainer = new TabletInsertionDataContainer(tablet, getPattern()); + dataContainer = new TabletInsertionDataContainer(tablet, isAligned, getPattern()); } return dataContainer.processRowByRow(consumer); } @@ -61,11 +63,17 @@ public class PipeRawTabletInsertionEvent implements TabletInsertionEvent { @Override public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, RowCollector> consumer) { if (dataContainer == null) { - dataContainer = new TabletInsertionDataContainer(tablet, getPattern()); + dataContainer = new TabletInsertionDataContainer(tablet, isAligned, getPattern()); } return dataContainer.processTablet(consumer); } + /////////////////////////// convertToTablet /////////////////////////// + + public boolean isAligned() { + return isAligned; + } + public Tablet convertToTablet() { final String pattern = getPattern(); @@ -76,7 +84,7 @@ public class PipeRawTabletInsertionEvent implements TabletInsertionEvent { // if pattern is not "root", we need to convert the tablet if (dataContainer == null) { - dataContainer = new TabletInsertionDataContainer(tablet, pattern); + dataContainer = new TabletInsertionDataContainer(tablet, isAligned, pattern); } return dataContainer.convertToTablet(); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java index 49275556cd2..86fbbd28f6e 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java @@ -48,6 +48,7 @@ import java.util.stream.IntStream; public class TabletInsertionDataContainer { private String deviceId; + private boolean isAligned; private MeasurementSchema[] measurementSchemaList; private String[] columnNameStringList; @@ -76,8 +77,8 @@ public class TabletInsertionDataContainer { } } - public TabletInsertionDataContainer(Tablet tablet, String pattern) { - parse(tablet, pattern); + public TabletInsertionDataContainer(Tablet tablet, boolean isAligned, String pattern) { + parse(tablet, isAligned, pattern); } //////////////////////////// parse //////////////////////////// @@ -87,6 +88,7 @@ public class TabletInsertionDataContainer { final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new Integer[originColumnSize]; this.deviceId = insertRowNode.getDevicePath().getFullPath(); + this.isAligned = insertRowNode.isAligned(); this.timestampColumn = new long[] {insertRowNode.getTime()}; generateColumnIndexMapper( @@ -153,6 +155,7 @@ public class TabletInsertionDataContainer { final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new Integer[originColumnSize]; this.deviceId = insertTabletNode.getDevicePath().getFullPath(); + this.isAligned = insertTabletNode.isAligned(); this.timestampColumn = insertTabletNode.getTimes(); generateColumnIndexMapper( @@ -204,11 +207,12 @@ public class TabletInsertionDataContainer { rowCount = timestampColumn.length; } - private void parse(Tablet tablet, String pattern) { + private void parse(Tablet tablet, boolean isAligned, String pattern) { final int originColumnSize = tablet.getSchemas().size(); final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new Integer[originColumnSize]; this.deviceId = tablet.deviceId; + this.isAligned = isAligned; this.timestampColumn = tablet.timestamps; final List<MeasurementSchema> originMeasurementSchemaList = tablet.getSchemas(); @@ -311,6 +315,7 @@ public class TabletInsertionDataContainer { new PipeRow( i, deviceId, + isAligned, measurementSchemaList, timestampColumn, valueColumnTypes, @@ -328,7 +333,11 @@ public class TabletInsertionDataContainer { return rowCollector.convertToTabletInsertionEvents(); } - //////////////////////////// convert //////////////////////////// + //////////////////////////// convertToTablet //////////////////////////// + + public boolean isAligned() { + return isAligned; + } public Tablet convertToTablet() { if (tablet != null) { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java index e9347d31202..a9a5f657d83 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java @@ -24,12 +24,15 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.TsFileDeviceIterator; import org.apache.iotdb.tsfile.read.TsFileReader; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.expression.IExpression; import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression; import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression; import org.apache.iotdb.tsfile.read.filter.TimeFilter; +import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.write.record.Tablet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +58,7 @@ public class TsFileInsertionDataContainer implements AutoCloseable { private final TsFileReader tsFileReader; private final Iterator<Map.Entry<String, List<String>>> deviceMeasurementsMapIterator; + private final Map<String, Boolean> deviceIsAlignedMap; private final Map<String, TSDataType> measurementDataTypeMap; public TsFileInsertionDataContainer(File tsFile, String pattern, long startTime, long endTime) @@ -72,6 +76,7 @@ public class TsFileInsertionDataContainer implements AutoCloseable { tsFileReader = new TsFileReader(tsFileSequenceReader); deviceMeasurementsMapIterator = filterDeviceMeasurementsMapByPattern().entrySet().iterator(); + deviceIsAlignedMap = readDeviceIsAlignedMap(); measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap(); } catch (Exception e) { LOGGER.error("failed to create TsFileInsertionDataContainer", e); @@ -119,6 +124,17 @@ public class TsFileInsertionDataContainer implements AutoCloseable { return filteredDeviceMeasurementsMap; } + private Map<String, Boolean> readDeviceIsAlignedMap() throws IOException { + final Map<String, Boolean> deviceIsAlignedMap = new HashMap<>(); + final TsFileDeviceIterator deviceIsAlignedIterator = + tsFileSequenceReader.getAllDevicesIteratorWithIsAligned(); + while (deviceIsAlignedIterator.hasNext()) { + final Pair<String, Boolean> deviceIsAlignedPair = deviceIsAlignedIterator.next(); + deviceIsAlignedMap.put(deviceIsAlignedPair.getLeft(), deviceIsAlignedPair.getRight()); + } + return deviceIsAlignedMap; + } + /** @return TabletInsertionEvent in a streaming way */ public Iterable<TabletInsertionEvent> toTabletInsertionEvents() { return () -> @@ -159,8 +175,9 @@ public class TsFileInsertionDataContainer implements AutoCloseable { throw new NoSuchElementException(); } - final TabletInsertionEvent next = - new PipeRawTabletInsertionEvent(tabletIterator.next()); + final Tablet tablet = tabletIterator.next(); + final boolean isAligned = deviceIsAlignedMap.getOrDefault(tablet.deviceId, false); + final TabletInsertionEvent next = new PipeRawTabletInsertionEvent(tablet, isAligned); if (!hasNext()) { close(); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEventFactory.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEventFactory.java index 274553301f3..2b2be4abc21 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEventFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEventFactory.java @@ -38,7 +38,8 @@ public class PipeRealtimeCollectEventFactory { public static PipeRealtimeCollectEvent createCollectEvent( WALEntryHandler walEntryHandler, InsertNode insertNode, TsFileResource resource) { return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent( - new PipeInsertNodeTabletInsertionEvent(walEntryHandler, insertNode.getProgressIndex()), + new PipeInsertNodeTabletInsertionEvent( + walEntryHandler, insertNode.getProgressIndex(), insertNode.isAligned()), insertNode, resource); } diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java index abd0cf9b056..00d44e8ade2 100644 --- a/server/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java +++ b/server/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java @@ -41,7 +41,9 @@ import java.util.Arrays; public class PipeTabletInsertionEventTest { private InsertRowNode insertRowNode; + private InsertRowNode insertRowNodeAligned; private InsertTabletNode insertTabletNode; + private InsertTabletNode insertTabletNodeAligned; final String deviceId = "root.sg.d1"; final long[] times = new long[] {110L, 111L, 112L, 113L, 114L}; @@ -98,6 +100,18 @@ public class PipeTabletInsertionEventTest { times[0], values, false); + + insertRowNodeAligned = + new InsertRowNode( + new PlanNodeId("plan node 1"), + new PartialPath(deviceId), + true, + measurementIds, + dataTypes, + schemas, + times[0], + values, + false); } private void createInsertTabletNode() throws IllegalPathException { @@ -131,6 +145,19 @@ public class PipeTabletInsertionEventTest { null, values, times.length); + + this.insertTabletNodeAligned = + new InsertTabletNode( + new PlanNodeId("plannode 1"), + new PartialPath(deviceId), + true, + measurementIds, + dataTypes, + schemas, + times, + null, + values, + times.length); } private void createTablet() { @@ -194,18 +221,59 @@ public class PipeTabletInsertionEventTest { @Test public void convertToTabletForTest() { - Tablet tablet1 = new TabletInsertionDataContainer(insertRowNode, pattern).convertToTablet(); + TabletInsertionDataContainer container1 = + new TabletInsertionDataContainer(insertRowNode, pattern); + Tablet tablet1 = container1.convertToTablet(); + boolean isAligned1 = container1.isAligned(); + Assert.assertEquals(tablet1, tabletForInsertRowNode); + Assert.assertFalse(isAligned1); + + TabletInsertionDataContainer container2 = + new TabletInsertionDataContainer(insertTabletNode, pattern); + Tablet tablet2 = container2.convertToTablet(); + boolean isAligned2 = container2.isAligned(); + Assert.assertEquals(tablet2, tabletForInsertTabletNode); + Assert.assertFalse(isAligned2); + + PipeRawTabletInsertionEvent event3 = new PipeRawTabletInsertionEvent(tablet1, false, pattern); + Tablet tablet3 = event3.convertToTablet(); + boolean isAligned3 = event3.isAligned(); + Assert.assertEquals(tablet1, tablet3); + Assert.assertFalse(isAligned3); + + PipeRawTabletInsertionEvent event4 = new PipeRawTabletInsertionEvent(tablet2, false, pattern); + Tablet tablet4 = event4.convertToTablet(); + boolean isAligned4 = event4.isAligned(); + Assert.assertEquals(tablet2, tablet4); + Assert.assertFalse(isAligned4); + } + + @Test + public void convertToAlignedTabletForTest() { + TabletInsertionDataContainer container1 = + new TabletInsertionDataContainer(insertRowNodeAligned, pattern); + Tablet tablet1 = container1.convertToTablet(); + boolean isAligned1 = container1.isAligned(); Assert.assertEquals(tablet1, tabletForInsertRowNode); + Assert.assertTrue(isAligned1); - Tablet tablet2 = new TabletInsertionDataContainer(insertTabletNode, pattern).convertToTablet(); + TabletInsertionDataContainer container2 = + new TabletInsertionDataContainer(insertTabletNodeAligned, pattern); + Tablet tablet2 = container2.convertToTablet(); + boolean isAligned2 = container2.isAligned(); Assert.assertEquals(tablet2, tabletForInsertTabletNode); + Assert.assertTrue(isAligned2); - PipeRawTabletInsertionEvent event3 = new PipeRawTabletInsertionEvent(tablet1, pattern); + PipeRawTabletInsertionEvent event3 = new PipeRawTabletInsertionEvent(tablet1, true, pattern); Tablet tablet3 = event3.convertToTablet(); + boolean isAligned3 = event3.isAligned(); Assert.assertEquals(tablet1, tablet3); + Assert.assertTrue(isAligned3); - PipeRawTabletInsertionEvent event4 = new PipeRawTabletInsertionEvent(tablet2, pattern); + PipeRawTabletInsertionEvent event4 = new PipeRawTabletInsertionEvent(tablet2, true, pattern); Tablet tablet4 = event4.convertToTablet(); + boolean isAligned4 = event4.isAligned(); Assert.assertEquals(tablet2, tablet4); + Assert.assertTrue(isAligned4); } } diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java index 8b797abcfff..642a66dc660 100644 --- a/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java @@ -231,7 +231,7 @@ public class TsFileInsertionDataContainerTest { tabletInsertionEvent2 -> tabletInsertionEvent2.processTablet( (tablet, rowCollector) -> { - new PipeRawTabletInsertionEvent(tablet) + new PipeRawTabletInsertionEvent(tablet, false) .processRowByRow( (row, collector) -> { try { @@ -256,7 +256,7 @@ public class TsFileInsertionDataContainerTest { event .processTablet( (tablet, rowCollector) -> { - new PipeRawTabletInsertionEvent(tablet) + new PipeRawTabletInsertionEvent(tablet, false) .processRowByRow( (row, collector) -> { try { @@ -380,7 +380,7 @@ public class TsFileInsertionDataContainerTest { tabletInsertionEvent2 -> tabletInsertionEvent2.processTablet( (tablet, rowCollector) -> { - new PipeRawTabletInsertionEvent(tablet) + new PipeRawTabletInsertionEvent(tablet, false) .processRowByRow( (row, collector) -> { try { @@ -405,7 +405,7 @@ public class TsFileInsertionDataContainerTest { event .processTablet( (tablet, rowCollector) -> { - new PipeRawTabletInsertionEvent(tablet) + new PipeRawTabletInsertionEvent(tablet, false) .processRowByRow( (row, collector) -> { try { @@ -494,7 +494,7 @@ public class TsFileInsertionDataContainerTest { tabletInsertionEvent2 -> tabletInsertionEvent2.processTablet( (tablet, rowCollector) -> { - new PipeRawTabletInsertionEvent(tablet) + new PipeRawTabletInsertionEvent(tablet, false) .processRowByRow( (row, collector) -> { try { @@ -518,7 +518,7 @@ public class TsFileInsertionDataContainerTest { event .processTablet( (tablet, rowCollector) -> { - new PipeRawTabletInsertionEvent(tablet) + new PipeRawTabletInsertionEvent(tablet, false) .processRowByRow( (row, collector) -> { try { @@ -606,7 +606,7 @@ public class TsFileInsertionDataContainerTest { tabletInsertionEvent2 -> tabletInsertionEvent2.processTablet( (tablet, rowCollector) -> { - new PipeRawTabletInsertionEvent(tablet) + new PipeRawTabletInsertionEvent(tablet, false) .processRowByRow( (row, collector) -> { try { @@ -630,7 +630,7 @@ public class TsFileInsertionDataContainerTest { event .processTablet( (tablet, rowCollector) -> { - new PipeRawTabletInsertionEvent(tablet) + new PipeRawTabletInsertionEvent(tablet, false) .processRowByRow( (row, collector) -> { try {
