This is an automated email from the ASF dual-hosted git repository.
ericpai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c30bb7c00d [IOTDB-3296] ext-pipe suport .mods file (#6102)
c30bb7c00d is described below
commit c30bb7c00de32bc8caf0e764c3702a52880cec59
Author: Jamber <[email protected]>
AuthorDate: Wed Aug 10 11:46:48 2022 +0800
[IOTDB-3296] ext-pipe suport .mods file (#6102)
---
.../iotdb/commons/concurrent/ThreadName.java | 3 +-
.../iotdb/db/sync/datasource/AbstractOpBlock.java | 16 +-
.../iotdb/db/sync/datasource/DeletionGroup.java | 242 +++++++++++
.../iotdb/db/sync/datasource/ModsfileOpBlock.java | 53 ---
.../iotdb/db/sync/datasource/PipeOpManager.java | 6 +-
.../iotdb/db/sync/datasource/TsFileOpBlock.java | 463 +++++++++++++++++----
.../iotdb/db/sync/externalpipe/ExtPipePlugin.java | 15 +-
.../db/sync/externalpipe/ExtPipePluginManager.java | 4 +-
.../iotdb/db/sync/pipedata/TsFilePipeData.java | 12 +-
.../db/sync/datasource/DeletionGroupTest.java | 231 ++++++++++
.../db/sync/datasource/PipeOpManagerTest.java | 226 +++++++++-
.../db/sync/datasource/TsFileOpBlockTest.java | 372 ++++++++++++++++-
12 files changed, 1454 insertions(+), 189 deletions(-)
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 0e9af94a30..66f312fad8 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -82,7 +82,8 @@ public enum ThreadName {
MPP_DATA_EXCHANGE_RPC_SERVER("MPPDataExchangeRPC"),
MPP_DATA_EXCHANGE_RPC_CLIENT("MPPDataExchangeRPC-Client"),
INTERNAL_SERVICE_RPC_SERVER("InternalServiceRPC"),
- INTERNAL_SERVICE_RPC_CLIENT("InternalServiceRPC-Client");
+ INTERNAL_SERVICE_RPC_CLIENT("InternalServiceRPC-Client"),
+ EXT_PIPE_PLUGIN_WORKER("ExtPipePlugin-Worker");
private final String name;
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/datasource/AbstractOpBlock.java
b/server/src/main/java/org/apache/iotdb/db/sync/datasource/AbstractOpBlock.java
index d4aa103ba5..4823ad5700 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/datasource/AbstractOpBlock.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/datasource/AbstractOpBlock.java
@@ -35,7 +35,7 @@ public abstract class AbstractOpBlock implements
Comparable<AbstractOpBlock> {
protected String storageGroup;
long filePipeSerialNumber;
- boolean closed = true;
+ boolean closed = false;
// record First Entry's index
protected long beginIndex = -1;
@@ -93,7 +93,7 @@ public abstract class AbstractOpBlock implements
Comparable<AbstractOpBlock> {
/** release current class' resource */
public void close() {
- closed = false;
+ closed = true;
};
public boolean isClosed() {
@@ -107,4 +107,16 @@ public abstract class AbstractOpBlock implements
Comparable<AbstractOpBlock> {
public void setFilePipeSerialNumber(long filePipeSerialNumber) {
this.filePipeSerialNumber = filePipeSerialNumber;
}
+
+ @Override
+ public String toString() {
+ return "storageGroup="
+ + storageGroup
+ + ", filePipeSerialNumber="
+ + filePipeSerialNumber
+ + ", beginIndex="
+ + beginIndex
+ + ", dataCount="
+ + dataCount;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/datasource/DeletionGroup.java
b/server/src/main/java/org/apache/iotdb/db/sync/datasource/DeletionGroup.java
new file mode 100644
index 0000000000..5058856627
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/sync/datasource/DeletionGroup.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.iotdb.db.sync.datasource;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+/**
+ * This class provides below functions
+ *
+ * <p>1) Save many deletion time-intervals.
+ *
+ * <p>2) Merge overlap intervals to 1 interval.
+ *
+ * <p>3) Check whether 1 time-range's data has been deleted according to saved
deletion
+ * time-intervals.
+ *
+ * <p>4) Check whether 1 time-point's data has been deleted according to saved
deletion
+ * time-intervals.
+ *
+ * <p>5) For time-ascending batch data, provide better-performance method to
check whether 1
+ * time-point's data has been deleted.
+ */
+public class DeletionGroup {
+ // TreeMap: StartTime => EndTime
+ private TreeMap<Long, Long> delIntervalMap;
+
+ public enum DeletedType {
+ NO_DELETED, // Mo data has been deleted
+ PARTIAL_DELETED, // Partial data has been deleted
+ FULL_DELETED // All data has been deleted
+ }
+
+ public static class IntervalCursor {
+ Iterator<Map.Entry<Long, Long>> iter = null;
+ boolean subsequentNoDelete = false;
+ public long startTime;
+ public long endTime;
+
+ public void reset() {
+ iter = null;
+ subsequentNoDelete = false;
+ }
+ }
+
+ public DeletionGroup() {
+ delIntervalMap = new TreeMap<>();
+ }
+
+ /**
+ * Insert delete time interval data for every deletion.
+ *
+ * @param startTime
+ * @param endTime
+ */
+ public void addDelInterval(long startTime, long endTime) {
+ if (startTime > endTime) {
+ throw new IllegalArgumentException("addDelInterval(), error: startTime >
endTime.");
+ }
+
+ // == pay attention, intervalMap's Entries are not overlap.
+ Map.Entry<Long, Long> startEntry = delIntervalMap.floorEntry(startTime);
+ Map.Entry<Long, Long> endEntry = delIntervalMap.floorEntry(endTime);
+
+ if ((startEntry != null) && (startTime <= startEntry.getValue())) {
+ startTime = startEntry.getKey();
+ }
+ if ((endEntry != null) && (endTime < endEntry.getValue())) {
+ endTime = endEntry.getValue();
+ }
+
+ // == find existing overlap entries and remove them
+ Map<Long, Long> overlapEntries = delIntervalMap.subMap(startTime, true,
endTime, true);
+ Iterator<Map.Entry<Long, Long>> iter =
overlapEntries.entrySet().iterator();
+ while (iter.hasNext()) {
+ iter.next();
+ iter.remove();
+ }
+
+ delIntervalMap.put(startTime, endTime); // add new deletion interval
+ }
+
+ /**
+ * If this object has no deletion data (i.e delIntervalMap is empty), return
true
+ *
+ * @return
+ */
+ public boolean isEmpty() {
+ return delIntervalMap.isEmpty();
+ }
+
+ /**
+ * Check the deletion-state of the data-points of specific time range
according to the info of
+ * .mods
+ *
+ * @param startTime - the start time of data set, inclusive
+ * @param endTime - the end time of data set, inclusive
+ * @return - Please refer to the definition of DeletedType
+ */
+ public DeletedType checkDeletedState(long startTime, long endTime) {
+ if (delIntervalMap.isEmpty()) {
+ return DeletedType.NO_DELETED;
+ }
+
+ if (startTime > endTime) {
+ throw new IllegalArgumentException("checkDeletedState(), error:
startTime > endTime.");
+ }
+
+ Map.Entry<Long, Long> startEntry = delIntervalMap.floorEntry(startTime);
+ Map.Entry<Long, Long> endEntry = delIntervalMap.floorEntry(endTime);
+
+ if (!Objects.equals(startEntry, endEntry)) {
+ return DeletedType.PARTIAL_DELETED;
+ }
+
+ // == when (startEntry == endEntry == null)
+ if (startEntry == null) {
+ return DeletedType.NO_DELETED;
+ }
+
+ if (startTime > startEntry.getValue()) {
+ return DeletedType.NO_DELETED;
+ }
+
+ if (endTime <= startEntry.getValue()) {
+ return DeletedType.FULL_DELETED;
+ }
+
+ return DeletedType.PARTIAL_DELETED;
+ }
+
+ /**
+ * Check whether this timestamp's data has been deleted according to .mods
info and data timestamp
+ *
+ * @param ts - data timestamp
+ * @return
+ */
+ public boolean isDeleted(long ts) {
+ if (delIntervalMap.isEmpty()) {
+ return false;
+ }
+
+ Map.Entry<Long, Long> entry = delIntervalMap.floorEntry(ts);
+ if (entry == null) {
+ return false;
+ }
+
+ if (ts > entry.getValue()) {
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Check whether ascending timestamp batch data have been deleted according
to .mods info. This
+ * method has better performance than method isDeleted(long ts) for
time-ascending bath data.
+ *
+ * <p>Note1: This method is only used for processing time-ascending batch
data.
+ *
+ * <p>Note2: Input parameter intervalCursor must be 1 variable. For first
calling this method,
+ * need use new variable intervalCursor or call intervalCursor.reset(). Then
continue using same
+ * variable intervalCursor for consequent calling.
+ *
+ * @param ts
+ * @param intervalCursor
+ * @return
+ */
+ public boolean isDeleted(long ts, IntervalCursor intervalCursor) {
+ if (delIntervalMap.isEmpty()) {
+ return false;
+ }
+
+ // == for first calling
+ if (intervalCursor.iter == null) {
+ Long floorKey = delIntervalMap.floorKey(ts);
+ if (floorKey == null) {
+ intervalCursor.iter = delIntervalMap.entrySet().iterator();
+ intervalCursor.startTime = delIntervalMap.firstKey();
+ intervalCursor.endTime = delIntervalMap.firstEntry().getValue();
+ return false;
+ }
+
+ intervalCursor.iter = delIntervalMap.tailMap(floorKey,
true).entrySet().iterator();
+ Map.Entry<Long, Long> entry = intervalCursor.iter.next();
+ intervalCursor.startTime = entry.getKey();
+ intervalCursor.endTime = entry.getValue();
+ }
+
+ if (intervalCursor.subsequentNoDelete) {
+ return false;
+ }
+
+ while (true) {
+ if (ts < intervalCursor.startTime) {
+ return false;
+ }
+ if (ts <= intervalCursor.endTime) {
+ return true;
+ }
+
+ if (intervalCursor.iter.hasNext()) {
+ Map.Entry<Long, Long> entry = intervalCursor.iter.next();
+ intervalCursor.startTime = entry.getKey();
+ intervalCursor.endTime = entry.getValue();
+ continue;
+ } else {
+ intervalCursor.subsequentNoDelete = true;
+ break;
+ }
+ }
+
+ return false;
+ }
+
+ @TestOnly
+ public TreeMap<Long, Long> getDelIntervalMap() {
+ return delIntervalMap;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/datasource/ModsfileOpBlock.java
b/server/src/main/java/org/apache/iotdb/db/sync/datasource/ModsfileOpBlock.java
deleted file mode 100644
index f708806842..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/sync/datasource/ModsfileOpBlock.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.iotdb.db.sync.datasource;
-
-import org.apache.iotdb.db.sync.externalpipe.operation.Operation;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ModsfileOpBlock extends AbstractOpBlock {
- private static final Logger logger =
LoggerFactory.getLogger(ModsfileOpBlock.class);
-
- public ModsfileOpBlock(String sg, String modsFileName) {
- super(sg, -1);
- }
-
- @Override
- public long getDataCount() {
- if (dataCount >= 0) {
- return dataCount;
- }
- // ToDO:
- return 0;
- }
-
- @Override
- public Operation getOperation(long index, long length) {
- return null;
- }
-
- @Override
- public void close() {
- super.close();
- }
-}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/datasource/PipeOpManager.java
b/server/src/main/java/org/apache/iotdb/db/sync/datasource/PipeOpManager.java
index ca6e89f1ed..04d1281ebf 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/datasource/PipeOpManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/datasource/PipeOpManager.java
@@ -74,7 +74,8 @@ public class PipeOpManager {
pipeOpSgManager.addPipeOpBlock(dataSrcEntry);
}
- public void appendTsFile(String sgName, String tsFilename, long
pipeDataSerialNumber)
+ public void appendTsFile(
+ String sgName, String tsFilename, String modsFileFullName, long
pipeDataSerialNumber)
throws IOException {
File file = new File(tsFilename);
if (!file.exists()) {
@@ -87,7 +88,8 @@ public class PipeOpManager {
maxFilePipeSerialNumber = pipeDataSerialNumber;
}
- TsFileOpBlock tsfileDataSrcEntry = new TsFileOpBlock(sgName, tsFilename,
pipeDataSerialNumber);
+ TsFileOpBlock tsfileDataSrcEntry =
+ new TsFileOpBlock(sgName, tsFilename, modsFileFullName,
pipeDataSerialNumber);
appendDataSrc(sgName, tsfileDataSrcEntry);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/datasource/TsFileOpBlock.java
b/server/src/main/java/org/apache/iotdb/db/sync/datasource/TsFileOpBlock.java
index 1c7983e128..93d26275b3 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/datasource/TsFileOpBlock.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/datasource/TsFileOpBlock.java
@@ -20,9 +20,15 @@
package org.apache.iotdb.db.sync.datasource;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.sync.externalpipe.operation.InsertOperation;
import org.apache.iotdb.db.sync.externalpipe.operation.Operation;
+import org.apache.iotdb.tsfile.common.cache.LRUCache;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
@@ -47,21 +53,25 @@ import
org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
-import org.apache.commons.lang3.tuple.ImmutableTriple;
-import org.apache.commons.lang3.tuple.Triple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import static java.lang.Math.max;
import static java.lang.Math.min;
+import static
org.apache.iotdb.db.sync.datasource.DeletionGroup.DeletedType.FULL_DELETED;
+import static
org.apache.iotdb.db.sync.datasource.DeletionGroup.DeletedType.NO_DELETED;
/** This class will parse 1 TsFile's content to 1 operation block. */
public class TsFileOpBlock extends AbstractOpBlock {
@@ -69,36 +79,82 @@ public class TsFileOpBlock extends AbstractOpBlock {
// tsFile name
private String tsFileName;
+ private String modsFileName;
private TsFileFullReader tsFileFullSeqReader;
// full Timeseries Metadata TreeMap : FileOffset => Pair(DeviceId,
TimeseriesMetadata)
private Map<Long, Pair<Path, TimeseriesMetadata>> fullTsMetadataMap;
- // TreeMap: LocalIndex => Pair(SensorFullPath, ChunkOffset, PointCount)
- private TreeMap<Long, Triple<String, Long, Long>> indexToChunkInfoMap;
+ // TreeMap: LocalIndex => ChunkInfo (measurementFullPath, ChunkOffset,
PointCount, deletedFlag)
+ private TreeMap<Long, ChunkInfo> indexToChunkInfoMap;
+
+ // Save all modifications that are from .mods file.
+ // (modificationList == null) means no .mods file or .mods file is empty.
+ Collection<Modification> modificationList;
+ // HashMap: measurement FullPath => DeletionGroup(save deletion info)
+ private Map<String, DeletionGroup> fullPathToDeletionMap;
+
+ // LRUMap: PageOffsetInTsfile => PageData
+ private LRUCache<Long, List<TimeValuePair>> pageCache;
+
+ private boolean dataReady = false;
Decoder timeDecoder =
Decoder.getDecoderByType(
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
TSDataType.INT64);
+ private class ChunkInfo {
+ public String measurementFullPath;
+ public long chunkOffset;
+ public long pointCount;
+ public DeletionGroup.DeletedType deletedFlag = NO_DELETED;
+ }
+
public TsFileOpBlock(String sg, String tsFileName, long
pipeDataSerialNumber) throws IOException {
this(sg, tsFileName, pipeDataSerialNumber, 0);
}
public TsFileOpBlock(String sg, String tsFileName, long
pipeDataSerialNumber, long beginIndex)
throws IOException {
+ this(sg, tsFileName, null, pipeDataSerialNumber, beginIndex);
+ }
+
+ public TsFileOpBlock(String sg, String tsFileName, String modsFileName, long
pipeDataSerialNumber)
+ throws IOException {
+ this(sg, tsFileName, modsFileName, pipeDataSerialNumber, 0);
+ }
+
+ public TsFileOpBlock(
+ String sg, String tsFileName, String modsFileName, long
pipeDataSerialNumber, long beginIndex)
+ throws IOException {
super(sg, beginIndex);
this.filePipeSerialNumber = pipeDataSerialNumber;
this.tsFileName = tsFileName;
- init();
+
+ this.modsFileName = null;
+ if (modsFileName != null) {
+ if (new File(modsFileName).exists()) {
+ this.modsFileName = modsFileName;
+ }
+ }
+
+ pageCache =
+ new LRUCache<Long, List<TimeValuePair>>(3) {
+ @Override
+ public List<TimeValuePair> loadObjectByKey(Long key) throws
IOException {
+ return null;
+ }
+ };
+
+ calculateDataCount();
}
/**
- * Init TsfileDataSrcEntry's dataCount
+ * Calculate TsfileOpBlock's dataCount
*
* @throws IOException
*/
- private void init() throws IOException {
+ private void calculateDataCount() throws IOException {
// == calculate dataCount according to tsfile
tsFileFullSeqReader = new TsFileFullReader(this.tsFileName);
fullTsMetadataMap = tsFileFullSeqReader.getAllTimeseriesMeta(false);
@@ -110,6 +166,8 @@ public class TsFileOpBlock extends AbstractOpBlock {
for (Pair<Path, TimeseriesMetadata> entry : fullTsMetadataMap.values()) {
dataCount += entry.right.getStatistics().getCount();
}
+
+ // == Here, release fullTsMetadataMap for saving memory.
fullTsMetadataMap = null;
}
@@ -123,6 +181,26 @@ public class TsFileOpBlock extends AbstractOpBlock {
return dataCount;
}
+ /**
+ * Check the deletion-state of the data-points of specific time range
according to the info of
+ * .mods.
+ *
+ * @param measurementPath - measurementPath full path without wildcard
+ * @param startTime - the start time of data set, inclusive
+ * @param endTime - the end time of data set, inclusive
+ * @return
+ */
+ private DeletionGroup.DeletedType checkDeletedState(
+ String measurementPath, long startTime, long endTime) {
+ DeletionGroup deletionGroup = getFullPathDeletion(measurementPath);
+
+ if (deletionGroup == null) {
+ return NO_DELETED;
+ }
+
+ return deletionGroup.checkDeletedState(startTime, endTime);
+ }
+
/**
* Generate indexToChunkInfoMap for whole TsFile
*
@@ -137,8 +215,8 @@ public class TsFileOpBlock extends AbstractOpBlock {
fullTsMetadataMap = tsFileFullSeqReader.getAllTimeseriesMeta(true);
}
- // chunkOffset => pair(SensorFullPath, chunkPointCount)
- Map<Long, Pair<String, Long>> offsetToCountMap = new TreeMap<>();
+ // chunkOffset => ChunkInfo (measurementFullPath, ChunkOffset, PointCount,
deletedFlag)
+ Map<Long, ChunkInfo> offsetToCountMap = new TreeMap<>();
for (Pair<Path, TimeseriesMetadata> value : fullTsMetadataMap.values()) {
List<IChunkMetadata> chunkMetaList = value.right.getChunkMetadataList();
@@ -148,24 +226,95 @@ public class TsFileOpBlock extends AbstractOpBlock {
for (IChunkMetadata chunkMetadata : chunkMetaList) {
// traverse every chunk
- long chunkOffset = chunkMetadata.getOffsetOfChunkHeader();
- long chunkPointCount = chunkMetadata.getStatistics().getCount();
- String sensorFullPath = value.left.getFullPath();
- ;
- offsetToCountMap.put(chunkOffset, new Pair<>(sensorFullPath,
chunkPointCount));
+ ChunkInfo chunkInfo = new ChunkInfo();
+ chunkInfo.measurementFullPath = value.left.getFullPath();
+ chunkInfo.chunkOffset = chunkMetadata.getOffsetOfChunkHeader();
+ chunkInfo.pointCount = chunkMetadata.getStatistics().getCount();
+
+ chunkInfo.deletedFlag =
+ checkDeletedState(
+ chunkInfo.measurementFullPath,
+ chunkMetadata.getStatistics().getStartTime(),
+ chunkMetadata.getStatistics().getEndTime());
+
+ offsetToCountMap.put(chunkInfo.chunkOffset, chunkInfo);
}
}
indexToChunkInfoMap = new TreeMap<>();
long localIndex = 0;
- for (Map.Entry<Long, Pair<String, Long>> entry :
offsetToCountMap.entrySet()) {
- Long chunkHeaderOffset = entry.getKey();
- Long pointCount = entry.getValue().right;
- String sensorFullPath = entry.getValue().left;
- indexToChunkInfoMap.put(
- localIndex, new ImmutableTriple<>(sensorFullPath, chunkHeaderOffset,
pointCount));
- localIndex += pointCount;
+ for (ChunkInfo chunkInfo : offsetToCountMap.values()) {
+ indexToChunkInfoMap.put(localIndex, chunkInfo);
+ localIndex += chunkInfo.pointCount;
+ }
+ }
+
+ /**
+ * Generate modificationList using .mods file. Result: (modificationList ==
null) means no .mods
+ * file or .mods file is empty.
+ *
+ * @throws IOException
+ */
+ private void buildModificationList() throws IOException {
+ if (modsFileName == null) {
+ logger.debug("buildModificationList(), modsFileName is null.");
+ modificationList = null;
+ return;
}
+
+ try (ModificationFile modificationFile = new
ModificationFile(modsFileName)) {
+ modificationList = modificationFile.getModifications();
+ }
+
+ if (modificationList.isEmpty()) {
+ modificationList = null;
+ }
+ }
+
+ /**
+ * Generate fullPathToDeletionMap for fullPath
+ *
+ * @param fullPath measurement full path without wildcard
+ * @return
+ */
+ private DeletionGroup getFullPathDeletion(String fullPath) {
+ // (fullPathToDeletionMap == null) means modificationList is null or empty
+ if (fullPathToDeletionMap == null) {
+ return null;
+ }
+
+ // Try to get data from cache firstly
+ if (fullPathToDeletionMap.containsKey(fullPath)) {
+ return fullPathToDeletionMap.get(fullPath);
+ }
+
+ // == insert all deletion intervals info to deletionGroup.
+ DeletionGroup deletionGroup = new DeletionGroup();
+ PartialPath partialPath = null;
+ try {
+ partialPath = new PartialPath(fullPath);
+ } catch (IllegalPathException e) {
+ logger.error("getFullPathDeletion(), find invalid fullPath: {}",
fullPath);
+ }
+
+ if (partialPath != null) {
+ // == Here, has been sure (modificationList != null) &&
(!modificationList.isEmpty())
+ for (Modification modification : modificationList) {
+ if ((modification instanceof Deletion)
+ && (modification.getPath().matchFullPath(partialPath))) {
+ Deletion deletion = (Deletion) modification;
+ deletionGroup.addDelInterval(deletion.getStartTime(),
deletion.getEndTime());
+ }
+ }
+ }
+
+ if (deletionGroup.isEmpty()) {
+ deletionGroup = null;
+ }
+
+ fullPathToDeletionMap.put(fullPath, deletionGroup);
+
+ return deletionGroup;
}
/**
@@ -173,72 +322,142 @@ public class TsFileOpBlock extends AbstractOpBlock {
* tvPairList for better performance
*
* @param chunkHeader
- * @param indexInChunk
+ * @param startIndexInChunk
* @param length
+ * @param deletionGroup - if it is not null, need to check whether data
points have benn deleted
* @param tvPairList
* @return
* @throws IOException
*/
private long getNonAlignedChunkPoints(
- ChunkHeader chunkHeader, long indexInChunk, long length,
List<TimeValuePair> tvPairList)
+ ChunkHeader chunkHeader,
+ long startIndexInChunk,
+ long length,
+ DeletionGroup deletionGroup,
+ List<TimeValuePair> tvPairList)
throws IOException {
Decoder valueDecoder =
Decoder.getDecoderByType(chunkHeader.getEncodingType(),
chunkHeader.getDataType());
- int chunkDataSize = chunkHeader.getDataSize();
+ int chunkLeftDataSize = chunkHeader.getDataSize();
- int index = 0;
- while (chunkDataSize > 0) {
+ long index = 0;
+ while ((chunkLeftDataSize > 0) && ((index - startIndexInChunk) < length)) {
// begin to traverse every page
- long filePos = tsFileFullSeqReader.position();
- boolean hasStatistic = ((chunkHeader.getChunkType() & 0x3F) ==
MetaMarker.CHUNK_HEADER);
+ long pagePosInTsfile = tsFileFullSeqReader.position();
+ boolean pageHeaderHasStatistic =
+ ((chunkHeader.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER);
PageHeader pageHeader =
- tsFileFullSeqReader.readPageHeader(chunkHeader.getDataType(),
hasStatistic);
+ tsFileFullSeqReader.readPageHeader(chunkHeader.getDataType(),
pageHeaderHasStatistic);
int pageSize = pageHeader.getSerializedPageSize();
- chunkDataSize -= pageSize;
+ chunkLeftDataSize -= pageSize;
- if (hasStatistic) {
+ if (pageHeaderHasStatistic) {
+ // == check whether whole page is out of [startIndexInChunk,
startIndexInChunk + length)
long pageDataCount = pageHeader.getNumOfValues();
- if ((index + pageDataCount) < indexInChunk) { // skip this page
- tsFileFullSeqReader.position(filePos + pageSize);
+ if ((index + pageDataCount) <= startIndexInChunk) { // skip this page
+ tsFileFullSeqReader.position(pagePosInTsfile + pageSize);
index += pageDataCount;
continue;
}
- }
- ByteBuffer pageData =
- tsFileFullSeqReader.readPage(pageHeader,
chunkHeader.getCompressionType());
+ // == check whether whole page has been deleted by .mods file
+ if (deletionGroup == null) { // No deletion related to current chunk
+ continue;
+ }
+
+ DeletionGroup.DeletedType deletedType =
+ deletionGroup.checkDeletedState(pageHeader.getStartTime(),
pageHeader.getEndTime());
- valueDecoder.reset();
- PageReader pageReader =
- new PageReader(pageData, chunkHeader.getDataType(), valueDecoder,
timeDecoder, null);
- BatchData batchData = pageReader.getAllSatisfiedPageData();
- if (chunkHeader.getChunkType() == MetaMarker.CHUNK_HEADER) {
- logger.debug("points in the page(by pageHeader): " +
pageHeader.getNumOfValues());
- } else {
- logger.debug("points in the page(by batchData): " +
batchData.length());
+ if (deletedType == FULL_DELETED) {
+ long needCount =
+ min(index + pageDataCount, startIndexInChunk + length)
+ - max(index, startIndexInChunk);
+ for (long i = 0; i < needCount; i++) {
+ tvPairList.add(null);
+ }
+ tsFileFullSeqReader.position(pagePosInTsfile + pageSize);
+ index += pageDataCount;
+ continue;
+ }
}
- if (batchData.isEmpty()) {
- logger.warn("getNonAlignedChunkPoints(), chunk is empty, chunkHeader =
{}.", chunkHeader);
+ // == At first, try to get page-data from pageCache.
+ List<TimeValuePair> pageTVList = pageCache.get(pagePosInTsfile);
+ // == if pageCache has no data
+ if (pageTVList == null) {
+ // == read the page's all data
+ pageTVList = getNonAlignedPagePoints(pageHeader, chunkHeader,
valueDecoder, deletionGroup);
+ pageCache.put(pagePosInTsfile, pageTVList);
}
- batchData.resetBatchData();
- while (batchData.hasCurrent()) {
- if (index++ >= indexInChunk) {
- TimeValuePair timeValuePair =
- new TimeValuePair(batchData.currentTime(),
batchData.currentTsPrimitiveType());
- logger.debug("getNonAlignedChunkPoints(), timeValuePair = {} ",
timeValuePair);
- tvPairList.add(timeValuePair);
- }
- if ((index - indexInChunk) >= length) { // data point is enough
- return (index - indexInChunk);
- }
- batchData.next();
+ int beginIdxInPage = (int) (max(index, startIndexInChunk) - index);
+ int countInPage = (int) min(pageTVList.size(), length - index +
startIndexInChunk);
+ tvPairList.addAll(
+ ((LinkedList) pageTVList).subList(beginIdxInPage, beginIdxInPage +
countInPage));
+
+ index += countInPage;
+ }
+
+ return (index - startIndexInChunk);
+ }
+
+ /**
+ * Parse 1 NonAligned page to get all data points. Note:
+ *
+ * <p>1) New data will be appended to parameter tvPairList for better
performance
+ *
+ * <p>2) deleted data by .mods will be set to null.
+ *
+ * @param pageHeader
+ * @param chunkHeader
+ * @param valueDecoder
+ * @param deletionGroup
+ * @return
+ * @throws IOException
+ */
+ private List<TimeValuePair> getNonAlignedPagePoints(
+ PageHeader pageHeader,
+ ChunkHeader chunkHeader,
+ Decoder valueDecoder,
+ DeletionGroup deletionGroup)
+ throws IOException {
+ List<TimeValuePair> tvList = new LinkedList<>();
+
+ ByteBuffer pageData =
+ tsFileFullSeqReader.readPage(pageHeader,
chunkHeader.getCompressionType());
+
+ valueDecoder.reset();
+ PageReader pageReader =
+ new PageReader(pageData, chunkHeader.getDataType(), valueDecoder,
timeDecoder, null);
+ BatchData batchData = pageReader.getAllSatisfiedPageData();
+ if (chunkHeader.getChunkType() == MetaMarker.CHUNK_HEADER) {
+ logger.debug("points in the page(by pageHeader): " +
pageHeader.getNumOfValues());
+ } else {
+ logger.debug("points in the page(by batchData): " + batchData.length());
+ }
+
+ if (batchData.isEmpty()) {
+ logger.warn("getNonAlignedChunkPoints(), chunk is empty, chunkHeader =
{}.", chunkHeader);
+ return tvList;
+ }
+
+ batchData.resetBatchData();
+ DeletionGroup.IntervalCursor intervalCursor = new
DeletionGroup.IntervalCursor();
+ while (batchData.hasCurrent()) {
+ long ts = batchData.currentTime();
+ if ((deletionGroup != null) && (deletionGroup.isDeleted(ts,
intervalCursor))) {
+ tvList.add(null);
+ } else {
+ TimeValuePair timeValuePair = new TimeValuePair(ts,
batchData.currentTsPrimitiveType());
+ logger.debug("getNonAlignedChunkPoints(), timeValuePair = {} ",
timeValuePair);
+ tvList.add(timeValuePair);
}
+
+ batchData.next();
}
- return (index - indexInChunk);
+ return tvList;
}
/**
@@ -248,6 +467,7 @@ public class TsFileOpBlock extends AbstractOpBlock {
* @param chunkHeader
* @param indexInChunk
* @param lengthInChunk
+ * @param deletionGroup - if it is not null, need to check whether data
points have benn deleted
* @param tvPairList
* @return
* @throws IOException
@@ -256,6 +476,7 @@ public class TsFileOpBlock extends AbstractOpBlock {
ChunkHeader chunkHeader,
long indexInChunk,
long lengthInChunk,
+ DeletionGroup deletionGroup,
List<TimeValuePair> tvPairList)
throws IOException {
List<long[]> timeBatch = new ArrayList<>();
@@ -324,27 +545,43 @@ public class TsFileOpBlock extends AbstractOpBlock {
}
/**
- * get 1 Chunk's partial data points according to indexInChunk & length
Note: new data will be
- * appended to parameter tvPairList for better performance
+ * get 1 Chunk's partial data points according to indexInChunk &
lengthInChunk Note: new data will
+ * be appended to parameter tvPairList for better performance
*
- * @param chunkHeaderOffset
+ * @param chunkInfo
* @param indexInChunk
- * @param length
+ * @param lengthInChunk
* @param tvPairList, the got data points will be appended to this List.
* @return
* @throws IOException
*/
private long getChunkPoints(
- long chunkHeaderOffset, long indexInChunk, long length,
List<TimeValuePair> tvPairList)
+ ChunkInfo chunkInfo, long indexInChunk, long lengthInChunk,
List<TimeValuePair> tvPairList)
throws IOException {
- tsFileFullSeqReader.position(chunkHeaderOffset);
+
+ // == If whole chunk has been deleted according to .mods file
+ if (chunkInfo.deletedFlag == FULL_DELETED) {
+ for (long i = 0; i < lengthInChunk; i++) {
+ tvPairList.add(null);
+ }
+ return lengthInChunk;
+ }
+
+ tsFileFullSeqReader.position(chunkInfo.chunkOffset);
byte chunkTypeByte = tsFileFullSeqReader.readMarker();
ChunkHeader chunkHeader =
tsFileFullSeqReader.readChunkHeader(chunkTypeByte);
+ DeletionGroup deletionGroup = null;
+ if (chunkInfo.deletedFlag != NO_DELETED) {
+ deletionGroup = getFullPathDeletion(chunkInfo.measurementFullPath);
+ }
+
if (chunkHeader.getDataType() == TSDataType.VECTOR) {
- return getAlignedChunkPoints(chunkHeader, indexInChunk, length,
tvPairList);
+ return getAlignedChunkPoints(
+ chunkHeader, indexInChunk, lengthInChunk, deletionGroup, tvPairList);
} else {
- return getNonAlignedChunkPoints(chunkHeader, indexInChunk, length,
tvPairList);
+ return getNonAlignedChunkPoints(
+ chunkHeader, indexInChunk, lengthInChunk, deletionGroup, tvPairList);
}
}
@@ -365,12 +602,32 @@ public class TsFileOpBlock extends AbstractOpBlock {
try {
measurementPath = new MeasurementPath(sensorFullPath);
} catch (IllegalPathException e) {
- logger.error("TsfileDataSrcEntry.insertToDataList(), Illegal
MeasurementPath: {}", "");
+ logger.error("TsFileOpBlock.insertToDataList(), Illegal MeasurementPath:
{}", "");
throw new IOException("Illegal MeasurementPath: " + sensorFullPath, e);
}
dataList.add(new Pair<>(measurementPath, tvPairList));
}
+ private void prepareData() throws IOException {
+ if (tsFileFullSeqReader == null) {
+ tsFileFullSeqReader = new TsFileFullReader(tsFileName);
+ }
+
+ if (modsFileName != null) {
+ buildModificationList();
+ }
+
+ if ((fullPathToDeletionMap == null) && (modificationList != null)) {
+ fullPathToDeletionMap = new HashMap<>();
+ }
+
+ if (indexToChunkInfoMap == null) {
+ buildIndexToChunkMap();
+ }
+
+ dataReady = true;
+ }
+
/**
* Get 1 Operation that contain needed data. Note: 1) Expected data range is
[index, index+length)
* 2) Real returned data length can less than input parameter length
@@ -382,19 +639,20 @@ public class TsFileOpBlock extends AbstractOpBlock {
*/
@Override
public Operation getOperation(long index, long length) throws IOException {
- long indexInTsfile = index - beginIndex;
-
- if (indexInTsfile < 0 || indexInTsfile >= dataCount) {
- logger.error("TsfileDataSrcEntry.getOperation(), index {} is out of
range.", index);
- throw new IOException("index is out of range.");
+ if (closed) {
+ logger.error("TsFileOpBlock.getOperation(), can not access closed
TsFileOpBlock: {}.", this);
+ throw new IOException("can not access closed TsFileOpBlock: " + this);
}
- if (tsFileFullSeqReader == null) {
- tsFileFullSeqReader = new TsFileFullReader(tsFileName);
+ long indexInTsfile = index - beginIndex;
+ if (indexInTsfile < 0 || indexInTsfile >= dataCount) {
+ logger.error("TsFileOpBlock.getOperation(), Error: index {} is out of
range.", index);
+ // throw new IOException("index is out of range.");
+ return null;
}
- if (indexToChunkInfoMap == null) {
- buildIndexToChunkMap();
+ if (!dataReady) {
+ prepareData();
}
LinkedList<Pair<MeasurementPath, List<TimeValuePair>>> dataList = new
LinkedList<>();
@@ -404,19 +662,18 @@ public class TsFileOpBlock extends AbstractOpBlock {
// handle all chunks that contain needed data
long remain = length;
while (remain > 0) {
- Map.Entry<Long, Triple<String, Long, Long>> entry =
- indexToChunkInfoMap.floorEntry(indexInTsfile);
+ Map.Entry<Long, ChunkInfo> entry =
indexToChunkInfoMap.floorEntry(indexInTsfile);
if (entry == null) {
logger.error(
- "TsfileDataSrcEntry.getOperation(), indexInTsfile {} if out of
indexToChunkOffsetMap.",
+ "TsFileOpBlock.getOperation(), indexInTsfile {} if out of
indexToChunkOffsetMap.",
indexInTsfile);
throw new IOException("indexInTsfile is out of range.");
}
- Long indexInChunk = indexInTsfile - entry.getKey();
- String sensorFullPath = entry.getValue().getLeft();
- Long chunkHeaderOffset = entry.getValue().getMiddle();
- Long chunkPointCount = entry.getValue().getRight();
+ long indexInChunk = indexInTsfile - entry.getKey();
+ ChunkInfo chunkInfo = entry.getValue();
+ String sensorFullPath = chunkInfo.measurementFullPath;
+ long chunkPointCount = chunkInfo.pointCount;
long lengthInChunk = min(chunkPointCount - indexInChunk, remain);
@@ -432,17 +689,17 @@ public class TsFileOpBlock extends AbstractOpBlock {
if (tvPairList == null) {
tvPairList = new LinkedList<>();
}
- long daltaCount = getChunkPoints(chunkHeaderOffset, indexInChunk,
lengthInChunk, tvPairList);
- if (daltaCount != lengthInChunk) {
+ long readCount = getChunkPoints(chunkInfo, indexInChunk, lengthInChunk,
tvPairList);
+ if (readCount != lengthInChunk) {
logger.error(
- "TsfileDataSrcEntry.getOperation(), error when read chunk from
file {}. lengthInChunk={}, daltaCount={}, ",
+ "TsFileOpBlock.getOperation(), error when read chunk from file {}.
lengthInChunk={}, readCount={}, ",
indexInTsfile,
lengthInChunk,
- daltaCount);
+ readCount);
throw new IOException("Error read chunk from file:" + indexInTsfile);
}
- remain -= daltaCount;
+ remain -= readCount;
indexInTsfile = entry.getKey() + chunkPointCount; // next chunk's local
index
if (indexInTsfile >= dataCount) { // has reached the end of this Tsfile
@@ -469,6 +726,8 @@ public class TsFileOpBlock extends AbstractOpBlock {
}
tsFileFullSeqReader = null;
}
+
+ dataReady = false;
}
/** This class is used to read & parse Tsfile */
@@ -601,4 +860,30 @@ public class TsFileOpBlock extends AbstractOpBlock {
return tsFileMetaData;
}
}
+
+ @TestOnly
+ public Collection<Modification> getModificationList() {
+ return modificationList;
+ }
+
+ @TestOnly
+ public Map<String, DeletionGroup> getFullPathToDeletionMap() {
+ return fullPathToDeletionMap;
+ }
+
+ @Override
+ public String toString() {
+ return "storageGroup="
+ + storageGroup
+ + ", tsFileName="
+ + tsFileName
+ + ", modsFileName="
+ + modsFileName
+ + ", filePipeSerialNumber="
+ + filePipeSerialNumber
+ + ", beginIndex="
+ + beginIndex
+ + ", dataCount="
+ + dataCount;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/externalpipe/ExtPipePlugin.java
b/server/src/main/java/org/apache/iotdb/db/sync/externalpipe/ExtPipePlugin.java
index 7ef6461191..2bec049a75 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/externalpipe/ExtPipePlugin.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/externalpipe/ExtPipePlugin.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.sync.externalpipe;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
@@ -49,7 +51,6 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -181,13 +182,8 @@ public class ExtPipePlugin {
// == Launch pipe worker threads
executorService =
- Executors.newFixedThreadPool(
- threadNum,
- r -> {
- Thread thread = new Thread(r);
- thread.setName("ExtPipePlugin-worker-" + extPipeTypeName + "-" +
thread.getId());
- return thread;
- });
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ threadNum, ThreadName.EXT_PIPE_PLUGIN_WORKER.getName() + "-" +
extPipeTypeName);
// == Start threads that will run external PiPeSink plugin
dataTransmissionTasks = new ArrayList<>(threadNum);
@@ -529,6 +525,9 @@ public class ExtPipePlugin {
for (Pair<MeasurementPath, List<TimeValuePair>> dataPair :
operation.getDataList()) {
MeasurementPath path = dataPair.left;
for (TimeValuePair tvPair : dataPair.right) {
+ if (tvPair == null) {
+ continue;
+ }
String[] nodes = path.getNodes();
long timestampInMs = tvPair.getTimestamp() / timestampDivisor;
switch (tvPair.getValue().getDataType()) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/externalpipe/ExtPipePluginManager.java
b/server/src/main/java/org/apache/iotdb/db/sync/externalpipe/ExtPipePluginManager.java
index b8ed356e86..5a1b1abe5a 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/externalpipe/ExtPipePluginManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/externalpipe/ExtPipePluginManager.java
@@ -195,8 +195,10 @@ public class ExtPipePluginManager {
String sgName = tsFilePipeData.getStorageGroupName();
String tsFileFullName = tsFilePipeData.getTsFilePath();
+ String modsFileFullName = tsFilePipeData.getModsFilePath();
try {
- pipeOpManager.appendTsFile(sgName, tsFileFullName,
pipeDataSerialNumber);
+ pipeOpManager.appendTsFile(
+ sgName, tsFileFullName, modsFileFullName,
pipeDataSerialNumber);
} catch (IOException e) {
logger.error("monitorPipeData(), Can not append TsFile: {}" +
tsFileFullName);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/TsFilePipeData.java
b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/TsFilePipeData.java
index 7fbdcadd23..42716766c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/TsFilePipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/TsFilePipeData.java
@@ -104,6 +104,14 @@ public class TsFilePipeData extends PipeData {
return parentDirPath + File.separator + tsFileName;
}
+ public String getResourceFilePath() {
+ return getTsFilePath() + TsFileResource.RESOURCE_SUFFIX;
+ }
+
+ public String getModsFilePath() {
+ return getTsFilePath() + ModificationFile.FILE_SUFFIX;
+ }
+
public String getStorageGroupName() {
return storageGroupName;
}
@@ -138,8 +146,8 @@ public class TsFilePipeData extends PipeData {
public List<File> getTsFiles(boolean shouldWaitForTsFileClose) throws
FileNotFoundException {
File tsFile = new File(getTsFilePath()).getAbsoluteFile();
- File resource = new File(tsFile.getAbsolutePath() +
TsFileResource.RESOURCE_SUFFIX);
- File mods = new File(tsFile.getAbsolutePath() +
ModificationFile.FILE_SUFFIX);
+ File resource = new File(getResourceFilePath());
+ File mods = new File(getModsFilePath());
List<File> files = new ArrayList<>();
if (!tsFile.exists()) {
diff --git
a/server/src/test/java/org/apache/iotdb/db/sync/datasource/DeletionGroupTest.java
b/server/src/test/java/org/apache/iotdb/db/sync/datasource/DeletionGroupTest.java
new file mode 100644
index 0000000000..df10ba50fb
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/sync/datasource/DeletionGroupTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.iotdb.db.sync.datasource;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static
org.apache.iotdb.db.sync.datasource.DeletionGroup.DeletedType.FULL_DELETED;
+import static
org.apache.iotdb.db.sync.datasource.DeletionGroup.DeletedType.NO_DELETED;
+import static
org.apache.iotdb.db.sync.datasource.DeletionGroup.DeletedType.PARTIAL_DELETED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class DeletionGroupTest {
+
+ private static DeletionGroup deletionGroup1;
+ private static DeletionGroup deletionGroup2; // empty
+
+ @BeforeClass
+ public static void prepareData() {
+ deletionGroup1 = new DeletionGroup();
+
+ // for item 0
+ deletionGroup1.addDelInterval(10, 30);
+ deletionGroup1.addDelInterval(20, 40);
+
+ // for item 3
+ deletionGroup1.addDelInterval(150, 200);
+ deletionGroup1.addDelInterval(150, 200);
+
+ // for item 1
+ deletionGroup1.addDelInterval(50, 50);
+ deletionGroup1.addDelInterval(50, 50);
+
+ // for item 4
+ deletionGroup1.addDelInterval(220, 300);
+ deletionGroup1.addDelInterval(250, 290);
+
+ // for item 2
+ deletionGroup1.addDelInterval(70, 110);
+ deletionGroup1.addDelInterval(70, 80);
+ deletionGroup1.addDelInterval(80, 90);
+ deletionGroup1.addDelInterval(100, 120);
+
+ deletionGroup2 = new DeletionGroup();
+ }
+
+ @Test
+ public void testAddDelInterval() {
+ boolean hasException = false;
+ try {
+ deletionGroup1.addDelInterval(10, 5);
+ } catch (IllegalArgumentException e) {
+ hasException = true;
+ }
+ assertTrue(hasException);
+
+ TreeMap<Long, Long> delIntervalMap = deletionGroup1.getDelIntervalMap();
+ Iterator<Map.Entry<Long, Long>> iter1 =
delIntervalMap.entrySet().iterator();
+ Map.Entry<Long, Long> entry1 = iter1.next();
+ assertEquals(10, entry1.getKey().longValue());
+ assertEquals(40, entry1.getValue().longValue());
+ entry1 = iter1.next();
+ assertEquals(50, entry1.getKey().longValue());
+ assertEquals(50, entry1.getValue().longValue());
+ entry1 = iter1.next();
+ assertEquals(70, entry1.getKey().longValue());
+ assertEquals(120, entry1.getValue().longValue());
+ entry1 = iter1.next();
+ assertEquals(150, entry1.getKey().longValue());
+ assertEquals(200, entry1.getValue().longValue());
+ entry1 = iter1.next();
+ assertEquals(220, entry1.getKey().longValue());
+ assertEquals(300, entry1.getValue().longValue());
+ }
+
+ @Test
+ public void testCheckDeletedState() {
+ boolean hasException = false;
+ try {
+ deletionGroup1.checkDeletedState(5, 1);
+ } catch (IllegalArgumentException e) {
+ hasException = true;
+ }
+ assertTrue(hasException);
+
+ assertEquals(NO_DELETED, deletionGroup1.checkDeletedState(1, 5));
+ assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(1, 10));
+ assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(2, 15));
+ assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(10, 10));
+ assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(10, 20));
+ assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(30, 40));
+ assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(10, 40));
+ assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(40, 40));
+ assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(35, 45));
+ assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(40, 45));
+
+ assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(50, 50));
+ assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(45, 50));
+ assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(50, 55));
+ assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(45, 55));
+
+ assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(5, 55));
+ assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(5, 500));
+
+ assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(120, 140));
+ assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(120, 150));
+
+ assertEquals(NO_DELETED, deletionGroup1.checkDeletedState(201, 201));
+ assertEquals(NO_DELETED, deletionGroup1.checkDeletedState(400, 500));
+
+ assertEquals(NO_DELETED, deletionGroup1.checkDeletedState(201, 219));
+ assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(201, 220));
+ assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(220, 220));
+ assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(220, 230));
+ assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(230, 250));
+ assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(250, 300));
+ assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(220, 300));
+ assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(220, 330));
+ assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(240, 350));
+
+ // == test empty deletionGroup2
+ assertEquals(NO_DELETED, deletionGroup2.checkDeletedState(1, 500));
+ }
+
+ @Test
+ public void testIsDeleted() {
+
+ assertEquals(false, deletionGroup1.isDeleted(5));
+ assertEquals(true, deletionGroup1.isDeleted(10));
+ assertEquals(true, deletionGroup1.isDeleted(20));
+ assertEquals(true, deletionGroup1.isDeleted(40));
+ assertEquals(false, deletionGroup1.isDeleted(45));
+
+ assertEquals(true, deletionGroup1.isDeleted(50));
+
+ assertEquals(false, deletionGroup1.isDeleted(60));
+ assertEquals(true, deletionGroup1.isDeleted(70));
+ assertEquals(true, deletionGroup1.isDeleted(100));
+ assertEquals(true, deletionGroup1.isDeleted(120));
+ assertEquals(false, deletionGroup1.isDeleted(122));
+
+ assertEquals(true, deletionGroup1.isDeleted(220));
+ assertEquals(true, deletionGroup1.isDeleted(250));
+ assertEquals(true, deletionGroup1.isDeleted(300));
+ assertEquals(false, deletionGroup1.isDeleted(400));
+
+ // == test empty deletionGroup2
+ assertEquals(false, deletionGroup2.isDeleted(100));
+ }
+
+ @Test
+ public void testIsDeleted2() {
+ DeletionGroup.IntervalCursor intervalCursor = new
DeletionGroup.IntervalCursor();
+ assertEquals(false, deletionGroup1.isDeleted(1, intervalCursor));
+ assertEquals(false, deletionGroup1.isDeleted(5, intervalCursor));
+ assertEquals(true, deletionGroup1.isDeleted(10, intervalCursor));
+ assertEquals(true, deletionGroup1.isDeleted(20, intervalCursor));
+ assertEquals(true, deletionGroup1.isDeleted(40, intervalCursor));
+ assertEquals(false, deletionGroup1.isDeleted(45, intervalCursor));
+
+ assertEquals(true, deletionGroup1.isDeleted(50, intervalCursor));
+ assertEquals(false, deletionGroup1.isDeleted(55, intervalCursor));
+
+ assertEquals(true, deletionGroup1.isDeleted(70, intervalCursor));
+ assertEquals(true, deletionGroup1.isDeleted(100, intervalCursor));
+ assertEquals(true, deletionGroup1.isDeleted(120, intervalCursor));
+ assertEquals(false, deletionGroup1.isDeleted(125, intervalCursor));
+
+ assertEquals(true, deletionGroup1.isDeleted(300, intervalCursor));
+ assertEquals(false, deletionGroup1.isDeleted(301, intervalCursor));
+
+ intervalCursor = new DeletionGroup.IntervalCursor();
+ assertEquals(true, deletionGroup1.isDeleted(10, intervalCursor));
+ assertEquals(true, deletionGroup1.isDeleted(20, intervalCursor));
+ assertEquals(true, deletionGroup1.isDeleted(40, intervalCursor));
+ assertEquals(false, deletionGroup1.isDeleted(45, intervalCursor));
+
+ assertEquals(true, deletionGroup1.isDeleted(50, intervalCursor));
+ assertEquals(false, deletionGroup1.isDeleted(55, intervalCursor));
+
+ assertEquals(true, deletionGroup1.isDeleted(70, intervalCursor));
+ assertEquals(true, deletionGroup1.isDeleted(100, intervalCursor));
+ assertEquals(true, deletionGroup1.isDeleted(120, intervalCursor));
+ assertEquals(false, deletionGroup1.isDeleted(125, intervalCursor));
+
+ assertEquals(true, deletionGroup1.isDeleted(300, intervalCursor));
+ assertEquals(false, deletionGroup1.isDeleted(301, intervalCursor));
+
+ intervalCursor.reset();
+ assertEquals(true, deletionGroup1.isDeleted(50, intervalCursor));
+ assertEquals(false, deletionGroup1.isDeleted(55, intervalCursor));
+
+ assertEquals(true, deletionGroup1.isDeleted(70, intervalCursor));
+ assertEquals(true, deletionGroup1.isDeleted(100, intervalCursor));
+ assertEquals(true, deletionGroup1.isDeleted(120, intervalCursor));
+ assertEquals(false, deletionGroup1.isDeleted(125, intervalCursor));
+
+ assertEquals(true, deletionGroup1.isDeleted(300, intervalCursor));
+ assertEquals(false, deletionGroup1.isDeleted(301, intervalCursor));
+
+ intervalCursor.reset();
+ assertEquals(false, deletionGroup1.isDeleted(301, intervalCursor));
+
+ // == test empty deletionGroup2
+ intervalCursor = new DeletionGroup.IntervalCursor();
+ assertEquals(false, deletionGroup2.isDeleted(301, intervalCursor));
+ }
+}
diff --git
a/server/src/test/java/org/apache/iotdb/db/sync/datasource/PipeOpManagerTest.java
b/server/src/test/java/org/apache/iotdb/db/sync/datasource/PipeOpManagerTest.java
index 26ad9e862a..ece3d7aeb4 100644
---
a/server/src/test/java/org/apache/iotdb/db/sync/datasource/PipeOpManagerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/sync/datasource/PipeOpManagerTest.java
@@ -19,6 +19,11 @@
package org.apache.iotdb.db.sync.datasource;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.sync.externalpipe.operation.InsertOperation;
import org.apache.iotdb.db.sync.externalpipe.operation.Operation;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -37,32 +42,42 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
public class PipeOpManagerTest {
public static final String TMP_DIR = "target";
private static final String seqTsFileName1 = TMP_DIR + File.separator +
"test1.tsfile";
+ private final String seqModsFileName1 = seqTsFileName1 + ".mods";
private static final String unSeqTsFileName1 = TMP_DIR + File.separator +
"test2.unseq.tsfile";
+ private final String unSeqModsFileName1 = unSeqTsFileName1 + ".mods";
public static final String DEFAULT_TEMPLATE = "template";
+ public final List<String> delFileList = new LinkedList<>();
@Before
public void prepareTestData() throws Exception {
createSeqTsfile(seqTsFileName1);
+ delFileList.add(seqTsFileName1);
+ creatSeqModsFile(seqModsFileName1);
+ delFileList.add(seqModsFileName1);
+
createUnSeqTsfile(unSeqTsFileName1);
+ delFileList.add(unSeqTsFileName1);
+ creatUnSeqModsFile(unSeqModsFileName1);
+ delFileList.add(unSeqModsFileName1);
}
@After
public void removeTestData() throws Exception {
- File file = new File(seqTsFileName1);
- if (file.exists()) {
- file.delete();
- }
-
- file = new File(unSeqTsFileName1);
- if (file.exists()) {
- file.delete();
+ for (String fileName : delFileList) {
+ File file = new File(fileName);
+ if (file.exists()) {
+ file.delete();
+ }
}
}
@@ -107,6 +122,14 @@ public class PipeOpManagerTest {
tsFileWriter.write(tsRecord);
tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once
+ tsRecord = new TSRecord(1617206403004L, "root.lemming.device3");
+ dPoint1 = new FloatDataPoint("sensor1", 4.1f);
+ dPoint2 = new IntDataPoint("sensor2", 42);
+ tsRecord.addTuple(dPoint1);
+ tsRecord.addTuple(dPoint2);
+ tsFileWriter.write(tsRecord);
+ tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once
+
// close TsFile
tsFileWriter.close();
}
@@ -145,9 +168,9 @@ public class PipeOpManagerTest {
tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once
tsRecord = new TSRecord(1617206403003L, "root2.lemming.device3");
- dPoint1 = new FloatDataPoint("sensor1", 3.1f);
- dPoint2 = new IntDataPoint("sensor2", 32);
- dPoint3 = new IntDataPoint("sensor3", 33);
+ dPoint1 = new FloatDataPoint("sensor1", 33.1f);
+ dPoint2 = new IntDataPoint("sensor2", 332);
+ dPoint3 = new IntDataPoint("sensor3", 333);
tsRecord.addTuple(dPoint1);
tsRecord.addTuple(dPoint2);
tsRecord.addTuple(dPoint3);
@@ -155,9 +178,9 @@ public class PipeOpManagerTest {
tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once
tsRecord = new TSRecord(1617206403004L, "root2.lemming.device3");
- dPoint1 = new FloatDataPoint("sensor1", 4.1f);
- dPoint2 = new IntDataPoint("sensor2", 42);
- dPoint3 = new IntDataPoint("sensor3", 43);
+ dPoint1 = new FloatDataPoint("sensor1", 44.1f);
+ dPoint2 = new IntDataPoint("sensor2", 442);
+ dPoint3 = new IntDataPoint("sensor3", 443);
tsRecord.addTuple(dPoint1);
tsRecord.addTuple(dPoint2);
tsRecord.addTuple(dPoint3);
@@ -168,8 +191,45 @@ public class PipeOpManagerTest {
tsFileWriter.close();
}
+ private void creatSeqModsFile(String modsFilePath) throws
IllegalPathException {
+ Modification[] modifications =
+ new Modification[] {
+ new Deletion(new PartialPath("root.lemming.device2.sensor2"), 2,
1617206403002L),
+ new Deletion(
+ new PartialPath("root.lemming.device3.sensor1"), 3,
1617206403003L, 1617206403009L),
+ };
+
+ try (ModificationFile mFile = new ModificationFile(modsFilePath)) {
+ for (Modification mod : modifications) {
+ mFile.write(mod);
+ }
+ } catch (IOException e) {
+ fail(e.getMessage());
+ } finally {;
+ }
+ }
+
+ private void creatUnSeqModsFile(String modsFilePath) throws
IllegalPathException {
+ Modification[] modifications =
+ new Modification[] {
+ new Deletion(new PartialPath("root2.lemming.device1.sensor1"), 2,
1617206403001L),
+ new Deletion(new PartialPath("root2.lemming.device2.*"), 3, 2,
Long.MAX_VALUE),
+ new Deletion(
+ new PartialPath("root1.lemming.**"), 3, 2, Long.MAX_VALUE), //
useless entry for root1
+ };
+
+ try (ModificationFile mFile = new ModificationFile(modsFilePath)) {
+ for (Modification mod : modifications) {
+ mFile.write(mod);
+ }
+ } catch (IOException e) {
+ fail(e.getMessage());
+ } finally {
+ }
+ }
+
@Test(timeout = 10_000L)
- public void testPipSrcManager() throws IOException {
+ public void testOpManager() throws IOException {
PipeOpManager pipeOpManager = new PipeOpManager(null);
String sgName1 = "root1";
@@ -181,7 +241,7 @@ public class PipeOpManagerTest {
pipeOpManager.appendDataSrc(sgName2, tsFileOpBlock2);
long count1 = tsFileOpBlock1.getDataCount();
- assertEquals(6, count1);
+ assertEquals(8, count1);
for (int i = 0; i < count1; i++) {
Operation operation = pipeOpManager.getOperation(sgName1, i, 8);
System.out.println("=== data" + i + ": " + operation + ", "); //
@@ -189,10 +249,13 @@ public class PipeOpManagerTest {
}
Operation operation = pipeOpManager.getOperation(sgName1, 0, 18);
+ InsertOperation insertOperation = (InsertOperation) operation;
System.out.println("+++ data10" + ": " + operation + ", ");
+ assertEquals(
+ "root.lemming.device1.sensor1",
insertOperation.getDataList().get(0).left.toString());
pipeOpManager.commitData(sgName1, count1 - 1);
- operation = pipeOpManager.getOperation(sgName1, 7, 18);
+ operation = pipeOpManager.getOperation(sgName1, 9, 18);
System.out.println("+++ data11" + ": " + operation + ", ");
assertNull(operation);
@@ -200,8 +263,133 @@ public class PipeOpManagerTest {
System.out.println("+++ data12" + ": " + operation + ", ");
assertEquals(4, operation.getDataCount());
- InsertOperation insertOperation = (InsertOperation) operation;
+ insertOperation = (InsertOperation) operation;
+ assertEquals(
+ "root2.lemming.device3.sensor3",
insertOperation.getDataList().get(0).left.toString());
assertEquals(1617206403003L,
insertOperation.getDataList().get(0).right.get(0).getTimestamp());
- assertEquals("33",
insertOperation.getDataList().get(0).right.get(0).getValue().toString());
+ assertEquals("333",
insertOperation.getDataList().get(0).right.get(0).getValue().toString());
+ }
+
+ @Test // (timeout = 10_000L)
+ public void testOpManager_Mods() throws IOException {
+ PipeOpManager pipeOpManager = new PipeOpManager(null);
+
+ String sgName1 = "root1";
+ // String sgName2 = "root2";
+
+ TsFileOpBlock tsFileOpBlock1 = new TsFileOpBlock(sgName1, seqTsFileName1,
seqModsFileName1, 1);
+ pipeOpManager.appendDataSrc(sgName1, tsFileOpBlock1);
+ TsFileOpBlock tsFileOpBlock2 =
+ new TsFileOpBlock(sgName1, unSeqTsFileName1, unSeqModsFileName1, 2);
+ pipeOpManager.appendDataSrc(sgName1, tsFileOpBlock2);
+
+ long count1 = tsFileOpBlock1.getDataCount();
+ assertEquals(8, count1);
+ for (int i = 0; i < 18; i++) {
+ Operation operation = pipeOpManager.getOperation(sgName1, i, 8);
+ assertEquals(sgName1, operation.getStorageGroup());
+ }
+
+ // == test batch data in TsFile1 + .mods
+ Operation operation = pipeOpManager.getOperation(sgName1, 0, 18);
+ assertEquals(8, operation.getDataCount());
+
+ InsertOperation insertOperation = (InsertOperation) operation;
+ int i = 0;
+ assertEquals(1617206403001L,
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ assertEquals("1.1",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+ i = 1;
+ assertEquals(1617206403001L,
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ assertEquals("12",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+ i = 2;
+ assertEquals(1617206403001L,
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ assertEquals("13",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+ i = 3;
+ assertEquals(1, insertOperation.getDataList().get(i).right.size());
+ assertNull(insertOperation.getDataList().get(i).right.get(0));
+
+ i = 4;
+ assertEquals(1, insertOperation.getDataList().get(i).right.size());
+ assertNull(insertOperation.getDataList().get(i).right.get(0));
+
+ i = 5;
+ assertEquals(1617206403003L,
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ assertEquals("32",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+ i = 6;
+ assertEquals(1, insertOperation.getDataList().get(i).right.size());
+ assertNull(insertOperation.getDataList().get(i).right.get(0));
+
+ i = 7;
+ assertEquals(1617206403004L,
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ assertEquals("42",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+ // == test batch data in TsFile2 + mods
+ operation = pipeOpManager.getOperation(sgName1, 8, 18);
+ assertEquals(10, operation.getDataCount());
+
+ insertOperation = (InsertOperation) operation;
+ i = 0;
+ assertEquals(
+ "root2.lemming.device1.sensor1",
insertOperation.getDataList().get(i).left.toString());
+ assertEquals(1, insertOperation.getDataList().get(i).right.size());
+ assertNull(insertOperation.getDataList().get(i).right.get(0));
+
+ i = 1;
+ assertEquals(
+ "root2.lemming.device1.sensor2",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(1617206403001L,
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ assertEquals("12",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+ i = 2;
+ assertEquals(
+ "root2.lemming.device1.sensor3",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(1617206403001L,
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ assertEquals("13",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+ i = 3;
+ assertEquals(
+ "root2.lemming.device2.sensor2",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(1, insertOperation.getDataList().get(i).right.size());
+ assertNull(insertOperation.getDataList().get(i).right.get(0));
+
+ i = 4;
+ assertEquals(
+ "root2.lemming.device3.sensor1",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(1617206403003L,
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ assertEquals("33.1",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+ i = 5;
+ assertEquals(
+ "root2.lemming.device3.sensor2",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(1617206403003L,
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ assertEquals("332",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+ i = 6;
+ assertEquals(
+ "root2.lemming.device3.sensor3",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(1617206403003L,
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ assertEquals("333",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+ i = 7;
+ assertEquals(
+ "root2.lemming.device3.sensor1",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(1617206403004L,
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ assertEquals("44.1",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+ i = 8;
+ assertEquals(
+ "root2.lemming.device3.sensor2",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(1617206403004L,
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ assertEquals("442",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+ i = 9;
+ assertEquals(
+ "root2.lemming.device3.sensor3",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(1617206403004L,
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ assertEquals("443",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/sync/datasource/TsFileOpBlockTest.java
b/server/src/test/java/org/apache/iotdb/db/sync/datasource/TsFileOpBlockTest.java
index 2c29effdc0..67571bd83d 100644
---
a/server/src/test/java/org/apache/iotdb/db/sync/datasource/TsFileOpBlockTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/sync/datasource/TsFileOpBlockTest.java
@@ -19,6 +19,11 @@
package org.apache.iotdb.db.sync.datasource;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.sync.externalpipe.operation.InsertOperation;
import org.apache.iotdb.db.sync.externalpipe.operation.Operation;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -37,29 +42,53 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
public class TsFileOpBlockTest {
- public static final String TMP_DIR = "target";
- private static final String tsFileName1 = TMP_DIR + File.separator +
"test1.tsfile";
- public static final String DEFAULT_TEMPLATE = "template";
+ public final String TMP_DIR = "target";
+ private final String tsFileName1 = TMP_DIR + File.separator + "test1.tsfile";
+ private final String tsFileName2 = TMP_DIR + File.separator + "test2.tsfile";
+ private final String modsFileName2 = tsFileName2 + ".mods";
+ private final String tsFileName3 = TMP_DIR + File.separator + "test3.tsfile";
+ private final String modsFileName3 = tsFileName3 + ".mods";
+ public final List<String> fileNameList = new LinkedList<>();
+
+ public final String DEFAULT_TEMPLATE = "template";
@Before
public void prepareTestData() throws Exception {
- createTsfile(tsFileName1);
+ createTsfile1(tsFileName1);
+ fileNameList.add(tsFileName1);
+
+ createTsfile2(tsFileName2);
+ fileNameList.add(tsFileName2);
+ creatModsFile2(modsFileName2);
+ fileNameList.add(modsFileName2);
+
+ createTsfile2(tsFileName3);
+ fileNameList.add(tsFileName3);
+ creatModsFile3(modsFileName3);
+ fileNameList.add(modsFileName3);
}
@After
public void removeTestData() throws Exception {
- File file = new File(tsFileName1);
- if (file.exists()) {
- file.delete();
+ for (String fileName : fileNameList) {
+ File file = new File(fileName);
+ if (file.exists()) {
+ file.delete();
+ }
}
}
- private void createTsfile(String tsfilePath) throws Exception {
+ private void createTsfile1(String tsfilePath) throws Exception {
File file = new File(tsfilePath);
if (file.exists()) {
file.delete();
@@ -105,7 +134,7 @@ public class TsFileOpBlockTest {
}
@Test(timeout = 10_000L)
- public void testSingleReadEntry() throws IOException {
+ public void testOpBlock() throws IOException {
TsFileOpBlock tsFileOpBlock = new TsFileOpBlock("root", tsFileName1, 0);
assertEquals("root", tsFileOpBlock.getStorageGroup());
@@ -132,10 +161,12 @@ public class TsFileOpBlockTest {
}
InsertOperation insertOperation = (InsertOperation) operation;
+
+ int k = 0;
assertEquals(
- "root.lemming.device3.sensor2",
insertOperation.getDataList().get(0).left.getFullPath());
- assertEquals(1617206403003L,
insertOperation.getDataList().get(0).right.get(0).getTimestamp());
- assertEquals("32",
insertOperation.getDataList().get(0).right.get(0).getValue().toString());
+ "root.lemming.device3.sensor2",
insertOperation.getDataList().get(k).left.getFullPath());
+ assertEquals(1617206403003L,
insertOperation.getDataList().get(k).right.get(0).getTimestamp());
+ assertEquals("32",
insertOperation.getDataList().get(k).right.get(0).getValue().toString());
for (int i = 0; i <= tsFileOpBlock.getDataCount() - 3; i++) {
operation = tsFileOpBlock.getOperation(i + 2, 3);
@@ -157,4 +188,321 @@ public class TsFileOpBlockTest {
tsFileOpBlock.close();
}
+
+ // == test TsFile + .mods
+
+ private void createTsfile2(String tsfilePath) throws Exception {
+ File file = new File(tsfilePath);
+ if (file.exists()) {
+ file.delete();
+ }
+
+ Schema schema = new Schema();
+ schema.extendTemplate(
+ DEFAULT_TEMPLATE, new MeasurementSchema("sensor1", TSDataType.FLOAT,
TSEncoding.RLE));
+ schema.extendTemplate(
+ DEFAULT_TEMPLATE, new MeasurementSchema("sensor2", TSDataType.INT32,
TSEncoding.TS_2DIFF));
+ schema.extendTemplate(
+ DEFAULT_TEMPLATE, new MeasurementSchema("sensor3", TSDataType.INT32,
TSEncoding.TS_2DIFF));
+
+ TsFileWriter tsFileWriter = new TsFileWriter(file, schema);
+
+ // construct TSRecord
+ TSRecord tsRecord = new TSRecord(1617206403001L, "root.lemming.device1");
+ DataPoint dPoint1 = new FloatDataPoint("sensor1", 1.1f);
+ DataPoint dPoint2 = new IntDataPoint("sensor2", 12);
+ DataPoint dPoint3 = new IntDataPoint("sensor3", 13);
+ tsRecord.addTuple(dPoint1);
+ tsRecord.addTuple(dPoint2);
+ tsRecord.addTuple(dPoint3);
+ tsFileWriter.write(tsRecord);
+ tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once
+
+ tsRecord = new TSRecord(1617206403002L, "root.lemming.device2");
+ dPoint2 = new IntDataPoint("sensor2", 22);
+ tsRecord.addTuple(dPoint2);
+ tsFileWriter.write(tsRecord);
+ tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once
+
+ tsRecord = new TSRecord(1617206403003L, "root.lemming.device3");
+ dPoint1 = new FloatDataPoint("sensor1", 3.1f);
+ dPoint2 = new IntDataPoint("sensor2", 32);
+ tsRecord.addTuple(dPoint1);
+ tsRecord.addTuple(dPoint2);
+ tsFileWriter.write(tsRecord);
+ tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once
+
+ tsRecord = new TSRecord(1617206403004L, "root.lemming.device1");
+ dPoint1 = new FloatDataPoint("sensor1", 4.1f);
+ dPoint2 = new IntDataPoint("sensor2", 42);
+ dPoint3 = new IntDataPoint("sensor3", 43);
+ tsRecord.addTuple(dPoint1);
+ tsRecord.addTuple(dPoint2);
+ tsRecord.addTuple(dPoint3);
+ tsFileWriter.write(tsRecord);
+ tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once
+
+ // close TsFile
+ tsFileWriter.close();
+ }
+
+ private void creatModsFile2(String modsFilePath) throws IllegalPathException
{
+ Modification[] modifications =
+ new Modification[] {
+ // new Deletion(new PartialPath(new String[] {"d1", "s2"}), 1, 2),
+ new Deletion(new PartialPath("root.lemming.device1.sensor1"), 2, 1),
+ new Deletion(new PartialPath("root.lemming.device1.sensor1"), 3, 2,
5),
+ new Deletion(new PartialPath("root.lemming.**"), 11, 1,
Long.MAX_VALUE)
+ };
+
+ try (ModificationFile mFile = new ModificationFile(modsFilePath)) {
+ for (Modification mod : modifications) {
+ mFile.write(mod);
+ }
+ } catch (IOException e) {
+ fail(e.getMessage());
+ } finally {;
+ }
+ }
+
+ private void creatModsFile3(String modsFilePath) throws IllegalPathException
{
+ Modification[] modifications =
+ new Modification[] {
+ new Deletion(new PartialPath("root.lemming.device1.sensor1"), 2,
1617206403001L),
+ new Deletion(new PartialPath("root.lemming.device2.*"), 3, 2,
Long.MAX_VALUE),
+ };
+
+ try (ModificationFile mFile = new ModificationFile(modsFilePath)) {
+ for (Modification mod : modifications) {
+ mFile.write(mod);
+ }
+ } catch (IOException e) {
+ fail(e.getMessage());
+ } finally {;
+ }
+ }
+
+ @Test(timeout = 10_000L)
+ public void testOpBlockMods2() throws IOException {
+
+ List<Modification> modificationList = null;
+ try (ModificationFile mFile = new ModificationFile(modsFileName2)) {
+ modificationList = (List<Modification>) mFile.getModifications();
+ }
+ // System.out.println("=== data: " + modificationList);
+
+ TsFileOpBlock tsFileOpBlock = new TsFileOpBlock("root", tsFileName2,
modsFileName2, 0);
+
+ assertEquals("root", tsFileOpBlock.getStorageGroup());
+ assertEquals(0, tsFileOpBlock.getBeginIndex());
+ assertEquals(9, tsFileOpBlock.getDataCount());
+ assertEquals(9, tsFileOpBlock.getNextIndex());
+
+ // == check setBeginIndex()
+ tsFileOpBlock.setBeginIndex(55);
+ assertEquals(64, tsFileOpBlock.getNextIndex());
+
+ // == check result before and after calling tsFileOpBlock.getOperation()
+ assertNull(tsFileOpBlock.getFullPathToDeletionMap());
+ assertNull(tsFileOpBlock.getModificationList());
+ Operation operation = tsFileOpBlock.getOperation(55, 1);
+ ;
+ assertNotNull(tsFileOpBlock.getFullPathToDeletionMap());
+ assertEquals(modificationList, tsFileOpBlock.getModificationList());
+ assertEquals(9, tsFileOpBlock.getDataCount());
+
+ // == check tsFileOpBlock.getOperation()
+ for (int i = 0; i < tsFileOpBlock.getDataCount(); i++) {
+ operation = tsFileOpBlock.getOperation(i + 55, 1);
+ assertEquals("root", operation.getStorageGroup());
+ assertEquals(1, operation.getDataCount());
+ assertEquals(i + 55, operation.getStartIndex());
+ assertEquals(i + 56, operation.getEndIndex());
+
+ assertEquals(true, operation instanceof InsertOperation);
+ InsertOperation insertOperation = (InsertOperation) operation;
+ assertEquals(1, insertOperation.getDataList().size());
+ // System.out.println("=== data" + i + ": " + operation +
((InsertOperation)
+ // operation).getDataList());
+ }
+
+ // == check deleted data caused by .mods file
+ operation = tsFileOpBlock.getOperation(55, 15);
+ assertEquals(9, operation.getDataCount());
+ InsertOperation insertOperation = (InsertOperation) operation;
+
+ int i = 0;
+ assertEquals(
+ "root.lemming.device1.sensor1",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(null, insertOperation.getDataList().get(i).right.get(0));
+
+ i = 1;
+ assertEquals(
+ "root.lemming.device1.sensor2",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(null, insertOperation.getDataList().get(i).right.get(0));
+
+ i = 2;
+ assertEquals(
+ "root.lemming.device1.sensor3",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(null, insertOperation.getDataList().get(i).right.get(0));
+
+ i = 3;
+ assertEquals(
+ "root.lemming.device2.sensor2",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(null, insertOperation.getDataList().get(i).right.get(0));
+
+ i = 4;
+ assertEquals(
+ "root.lemming.device3.sensor1",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(null, insertOperation.getDataList().get(i).right.get(0));
+
+ i = 5;
+ assertEquals(
+ "root.lemming.device3.sensor2",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(null, insertOperation.getDataList().get(i).right.get(0));
+
+ // assertEquals(1617206403003L,
+ // insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ // assertEquals("32",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+ tsFileOpBlock.close();
+ }
+
+ @Test(timeout = 10_000L)
+ public void testOpBlockMods3() throws IOException {
+
+ List<Modification> modificationList = null;
+ try (ModificationFile mFile = new ModificationFile(modsFileName3)) {
+ modificationList = (List<Modification>) mFile.getModifications();
+ }
+
+ TsFileOpBlock tsFileOpBlock = new TsFileOpBlock("root", tsFileName2,
modsFileName3, 0);
+
+ assertEquals("root", tsFileOpBlock.getStorageGroup());
+ assertEquals(0, tsFileOpBlock.getBeginIndex());
+ assertEquals(9, tsFileOpBlock.getDataCount());
+ assertEquals(9, tsFileOpBlock.getNextIndex());
+
+ // == check setBeginIndex()
+ tsFileOpBlock.setBeginIndex(55);
+ assertEquals(64, tsFileOpBlock.getNextIndex());
+
+ // == check result before and after calling tsFileOpBlock.getOperation()
+ assertNull(tsFileOpBlock.getFullPathToDeletionMap());
+ assertNull(tsFileOpBlock.getModificationList());
+ Operation operation = tsFileOpBlock.getOperation(55, 1);
+
+ assertNotNull(tsFileOpBlock.getFullPathToDeletionMap());
+ assertEquals(modificationList, tsFileOpBlock.getModificationList());
+ assertEquals(9, tsFileOpBlock.getDataCount());
+
+ // == check tsFileOpBlock.getOperation()
+ for (int i = 0; i < tsFileOpBlock.getDataCount(); i++) {
+ operation = tsFileOpBlock.getOperation(i + 55, 1);
+ assertEquals("root", operation.getStorageGroup());
+ assertEquals(1, operation.getDataCount());
+ assertEquals(i + 55, operation.getStartIndex());
+ assertEquals(i + 56, operation.getEndIndex());
+
+ assertEquals(true, operation instanceof InsertOperation);
+ InsertOperation insertOperation = (InsertOperation) operation;
+ assertEquals(1, insertOperation.getDataList().size());
+ // System.out.println("=== data" + i + ": " + operation +
((InsertOperation)
+ // operation).getDataList());
+ }
+
+ // == check deleted data caused by .mods file
+ operation = tsFileOpBlock.getOperation(55, 20);
+ assertEquals(9, operation.getDataCount());
+ InsertOperation insertOperation = (InsertOperation) operation;
+
+ int i = 0;
+ assertEquals(
+ "root.lemming.device1.sensor1",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(null, insertOperation.getDataList().get(i).right.get(0));
+
+ i = 1;
+ assertEquals(
+ "root.lemming.device1.sensor2",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(1617206403001L,
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ assertEquals("12",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+ i = 2;
+ assertEquals(
+ "root.lemming.device1.sensor3",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(1617206403001L,
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ assertEquals("13",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+ i = 3;
+ assertEquals(
+ "root.lemming.device2.sensor2",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(null, insertOperation.getDataList().get(i).right.get(0));
+
+ i = 4;
+ assertEquals(
+ "root.lemming.device3.sensor1",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(1617206403003L,
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ assertEquals("3.1",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+ i = 5;
+ assertEquals(
+ "root.lemming.device3.sensor2",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(1617206403003L,
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ assertEquals("32",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+ i = 6;
+ assertEquals(
+ "root.lemming.device1.sensor1",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(1617206403004L,
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ assertEquals("4.1",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+ i = 7;
+ assertEquals(
+ "root.lemming.device1.sensor2",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(1617206403004L,
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ assertEquals("42",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+ i = 8;
+ assertEquals(
+ "root.lemming.device1.sensor3",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(1617206403004L,
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ assertEquals("43",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+ // == test getting old data and page cache
+ operation = tsFileOpBlock.getOperation(59, 20);
+ assertEquals(5, operation.getDataCount());
+ insertOperation = (InsertOperation) operation;
+
+ i = 0;
+ assertEquals(
+ "root.lemming.device3.sensor1",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(1617206403003L,
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ assertEquals("3.1",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+ i = 1;
+ assertEquals(
+ "root.lemming.device3.sensor2",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(1617206403003L,
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ assertEquals("32",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+ i = 2;
+ assertEquals(
+ "root.lemming.device1.sensor1",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(1617206403004L,
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ assertEquals("4.1",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+ i = 3;
+ assertEquals(
+ "root.lemming.device1.sensor2",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(1617206403004L,
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ assertEquals("42",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+ i = 4;
+ assertEquals(
+ "root.lemming.device1.sensor3",
insertOperation.getDataList().get(i).left.getFullPath());
+ assertEquals(1617206403004L,
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+ assertEquals("43",
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+ tsFileOpBlock.close();
+ }
}