This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch tsfile_name
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/tsfile_name by this push:
new 1ba3f11060 [IOTDB-3608] Modify key of cache and MergeReader with new
TsFile version rule (#6394)
1ba3f11060 is described below
commit 1ba3f11060ffc99ac98916ba636568c152631188
Author: 周沛辰 <[email protected]>
AuthorDate: Mon Jul 4 14:41:42 2022 +0800
[IOTDB-3608] Modify key of cache and MergeReader with new TsFile version
rule (#6394)
* add timestamp in ChunkCache,TimeseriesMetadataCache and BloomFilterCache
* add timestamp in MergeReaderPriority
Co-authored-by: 沛辰周 <[email protected]>
---
.../reader/mult/IAssignPathPriorityMergeReader.java | 2 +-
.../apache/iotdb/db/engine/cache/BloomFilterCache.java | 5 ++++-
.../iotdb/db/engine/cache/TimeSeriesMetadataCache.java | 5 ++++-
.../db/mpp/execution/operator/source/SeriesScanUtil.java | 7 +++++--
.../iotdb/db/query/reader/series/SeriesReader.java | 7 +++++--
.../db/query/reader/universal/PriorityMergeReader.java | 16 ++++++++++++----
.../iotdb/tsfile/file/metadata/AlignedChunkMetadata.java | 4 ++++
.../apache/iotdb/tsfile/file/metadata/ChunkMetadata.java | 11 ++++++++++-
.../iotdb/tsfile/file/metadata/IChunkMetadata.java | 2 ++
.../org/apache/iotdb/tsfile/utils/FilePathUtils.java | 11 ++++++-----
10 files changed, 53 insertions(+), 17 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/IAssignPathPriorityMergeReader.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/IAssignPathPriorityMergeReader.java
index a3442884bb..5d990bd9fe 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/IAssignPathPriorityMergeReader.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/IAssignPathPriorityMergeReader.java
@@ -39,7 +39,7 @@ public interface IAssignPathPriorityMergeReader extends
IPointReader {
getFullPath(),
reader,
reader.nextTimeValuePair(getFullPath()),
- new PriorityMergeReader.MergeReaderPriority(priority, 0)));
+ new PriorityMergeReader.MergeReaderPriority(priority, 0,
0)));
} else {
reader.close();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/cache/BloomFilterCache.java
b/server/src/main/java/org/apache/iotdb/db/engine/cache/BloomFilterCache.java
index c3e0d9197e..3b898c6fed 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/cache/BloomFilterCache.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/cache/BloomFilterCache.java
@@ -148,6 +148,7 @@ public class BloomFilterCache {
// share this String.
private final String filePath;
private final String tsFilePrefixPath;
+ private final long timestamp;
private final long tsFileVersion;
// high 32 bit is compaction level, low 32 bit is merge count
private final long compactionVersion;
@@ -159,6 +160,7 @@ public class BloomFilterCache {
this.tsFilePrefixPath = tsFilePrefixPathAndTsFileVersionPair.left;
this.tsFileVersion = tsFilePrefixPathAndTsFileVersionPair.right[0];
this.compactionVersion = tsFilePrefixPathAndTsFileVersionPair.right[1];
+ this.timestamp = tsFilePrefixPathAndTsFileVersionPair.right[2];
}
@Override
@@ -172,7 +174,8 @@ public class BloomFilterCache {
BloomFilterCache.BloomFilterCacheKey that =
(BloomFilterCache.BloomFilterCacheKey) o;
return tsFileVersion == that.tsFileVersion
&& compactionVersion == that.compactionVersion
- && tsFilePrefixPath.equals(that.tsFilePrefixPath);
+ && tsFilePrefixPath.equals(that.tsFilePrefixPath)
+ && timestamp == that.timestamp;
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
index 7f10ce7df6..5dac5eb514 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
@@ -268,6 +268,7 @@ public class TimeSeriesMetadataCache {
private final String filePath;
private final String tsFilePrefixPath;
+ private final long timestamp;
private final long tsFileVersion;
// high 32 bit is compaction level, low 32 bit is merge count
private final long compactionVersion;
@@ -281,6 +282,7 @@ public class TimeSeriesMetadataCache {
this.tsFilePrefixPath = tsFilePrefixPathAndTsFileVersionPair.left;
this.tsFileVersion = tsFilePrefixPathAndTsFileVersionPair.right[0];
this.compactionVersion = tsFilePrefixPathAndTsFileVersionPair.right[1];
+ this.timestamp = tsFilePrefixPathAndTsFileVersionPair.right[2];
this.device = device;
this.measurement = measurement;
}
@@ -298,7 +300,8 @@ public class TimeSeriesMetadataCache {
&& Objects.equals(device, that.device)
&& tsFileVersion == that.tsFileVersion
&& compactionVersion == that.compactionVersion
- && tsFilePrefixPath.equals(that.tsFilePrefixPath);
+ && tsFilePrefixPath.equals(that.tsFilePrefixPath)
+ && timestamp == that.timestamp;
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index 279f6be932..edc72eb168 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -522,6 +522,7 @@ public class SeriesScanUtil {
for (IPageReader iPageReader : pageReaderList) {
seqPageReaders.add(
new VersionPageReader(
+ chunkMetaData.getTimestamp(),
chunkMetaData.getVersion(),
chunkMetaData.getOffsetOfChunkHeader(),
iPageReader,
@@ -531,6 +532,7 @@ public class SeriesScanUtil {
for (int i = pageReaderList.size() - 1; i >= 0; i--) {
seqPageReaders.add(
new VersionPageReader(
+ chunkMetaData.getTimestamp(),
chunkMetaData.getVersion(),
chunkMetaData.getOffsetOfChunkHeader(),
pageReaderList.get(i),
@@ -542,6 +544,7 @@ public class SeriesScanUtil {
pageReader ->
unSeqPageReaders.add(
new VersionPageReader(
+ chunkMetaData.getTimestamp(),
chunkMetaData.getVersion(),
chunkMetaData.getOffsetOfChunkHeader(),
pageReader,
@@ -1101,8 +1104,8 @@ public class SeriesScanUtil {
protected boolean isSeq;
- VersionPageReader(long version, long offset, IPageReader data, boolean
isSeq) {
- this.version = new PriorityMergeReader.MergeReaderPriority(version,
offset);
+ VersionPageReader(long timestamp, long version, long offset, IPageReader
data, boolean isSeq) {
+ this.version = new PriorityMergeReader.MergeReaderPriority(timestamp,
version, offset);
this.data = data;
this.isSeq = isSeq;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 5a16e4392a..d865ff81df 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -602,6 +602,7 @@ public class SeriesReader {
for (IPageReader iPageReader : pageReaderList) {
seqPageReaders.add(
new VersionPageReader(
+ chunkMetaData.getTimestamp(),
chunkMetaData.getVersion(),
chunkMetaData.getOffsetOfChunkHeader(),
iPageReader,
@@ -611,6 +612,7 @@ public class SeriesReader {
for (int i = pageReaderList.size() - 1; i >= 0; i--) {
seqPageReaders.add(
new VersionPageReader(
+ chunkMetaData.getTimestamp(),
chunkMetaData.getVersion(),
chunkMetaData.getOffsetOfChunkHeader(),
pageReaderList.get(i),
@@ -622,6 +624,7 @@ public class SeriesReader {
pageReader ->
unSeqPageReaders.add(
new VersionPageReader(
+ chunkMetaData.getTimestamp(),
chunkMetaData.getVersion(),
chunkMetaData.getOffsetOfChunkHeader(),
pageReader,
@@ -1149,8 +1152,8 @@ public class SeriesReader {
protected boolean isSeq;
- VersionPageReader(long version, long offset, IPageReader data, boolean
isSeq) {
- this.version = new MergeReaderPriority(version, offset);
+ VersionPageReader(long timestamp, long version, long offset, IPageReader
data, boolean isSeq) {
+ this.version = new MergeReaderPriority(timestamp, version, offset);
this.data = data;
this.isSeq = isSeq;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
index af5f3202df..75bd56798c 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
@@ -66,7 +66,7 @@ public class PriorityMergeReader implements IPointReader {
public void addReader(IPointReader reader, long priority) throws IOException
{
if (reader.hasNextTimeValuePair()) {
heap.add(
- new Element(reader, reader.nextTimeValuePair(), new
MergeReaderPriority(priority, 0)));
+ new Element(reader, reader.nextTimeValuePair(), new
MergeReaderPriority(priority, 0, 0)));
} else {
reader.close();
}
@@ -169,20 +169,28 @@ public class PriorityMergeReader implements IPointReader {
}
public static class MergeReaderPriority implements
Comparable<MergeReaderPriority> {
+ long timestamp;
long version;
long offset;
- public MergeReaderPriority(long version, long offset) {
+ public MergeReaderPriority(long timestamp, long version, long offset) {
+ this.timestamp = timestamp;
this.version = version;
this.offset = offset;
}
@Override
public int compareTo(MergeReaderPriority o) {
- if (version < o.version) {
+ if (timestamp < o.timestamp) {
return -1;
+ } else if (timestamp > o.timestamp) {
+ return 1;
+ } else {
+ if (version < o.version) {
+ return -1;
+ }
+ return ((version > o.version) ? 1 : (Long.compare(offset, o.offset)));
}
- return ((version > o.version) ? 1 : (Long.compare(offset, o.offset)));
}
@Override
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java
index a69e61b9b8..28c66a304c 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java
@@ -103,6 +103,10 @@ public class AlignedChunkMetadata implements
IChunkMetadata {
}
}
+ public long getTimestamp() {
+ return timeChunkMetadata.getTimestamp();
+ }
+
@Override
public long getOffsetOfChunkHeader() {
return timeChunkMetadata.getOffsetOfChunkHeader();
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
index 831f8cd120..a3d2ff3a59 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
@@ -82,6 +82,9 @@ public class ChunkMetadata implements IChunkMetadata {
// high 32 bit is compaction level, low 32 bit is merge count
private long compactionVersion;
+ // used for ChunkCache, tsfile timestamp
+ private long timestamp;
+
public ChunkMetadata() {}
/**
@@ -196,6 +199,10 @@ public class ChunkMetadata implements IChunkMetadata {
this.version = version;
}
+ public long getTimestamp() {
+ return this.timestamp;
+ }
+
public List<TimeRange> getDeleteIntervalList() {
return deleteIntervalList;
}
@@ -250,7 +257,8 @@ public class ChunkMetadata implements IChunkMetadata {
return offsetOfChunkHeader == that.offsetOfChunkHeader
&& version == that.version
&& compactionVersion == that.compactionVersion
- && tsFilePrefixPath.equals(that.tsFilePrefixPath);
+ && tsFilePrefixPath.equals(that.tsFilePrefixPath)
+ && timestamp == that.timestamp;
}
@Override
@@ -326,6 +334,7 @@ public class ChunkMetadata implements IChunkMetadata {
tsFilePrefixPath = tsFilePrefixPathAndTsFileVersionPair.left;
this.version = tsFilePrefixPathAndTsFileVersionPair.right[0];
this.compactionVersion = tsFilePrefixPathAndTsFileVersionPair.right[1];
+ this.timestamp = tsFilePrefixPathAndTsFileVersionPair.right[2];
}
@Override
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
index 1cc819fd52..0a92612001 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
@@ -45,6 +45,8 @@ public interface IChunkMetadata {
void setVersion(long version);
+ long getTimestamp();
+
long getOffsetOfChunkHeader();
long getStartTime();
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FilePathUtils.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FilePathUtils.java
index 5e815c59f7..8edd758421 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FilePathUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FilePathUtils.java
@@ -114,13 +114,13 @@ public class FilePathUtils {
}
/**
- * @return a long array whose length is 2, the first long value is tsfile
version, second long
+ * @return a long array whose length is 3, the first long value is tsfile
version, second long
* value is compaction version, high 32 bit is in-space compaction
count, low 32 bit is
- * cross-space compaction count
+ * cross-space compaction count, the third long value is timestamp.
*/
private static long[] splitAndGetVersionArray(String tsFileName) {
String[] names = tsFileName.split(FILE_NAME_SEPARATOR);
- long[] versionArray = new long[2];
+ long[] versionArray = new long[3];
if (names.length != 4) {
return versionArray;
}
@@ -128,6 +128,7 @@ public class FilePathUtils {
versionArray[1] =
(Long.parseLong(names[2]) << 32)
| Long.parseLong(names[3].substring(0, names[3].length() -
TSFILE_SUFFIX.length()));
+ versionArray[2] = Long.parseLong(names[0]);
return versionArray;
}
@@ -141,8 +142,8 @@ public class FilePathUtils {
/**
* pair.left tsFilePrefixPath, like data/data/sequence/root.sg1/0/0
pair.right is a long array
- * whose length is 2 pair.right[0] is tsfile version pair.right[1] is
compaction version, high 32
- * bit is compaction level, low 32 bit is merge count
+ * whose length is 3, pair.right[0] is tsfile version, pair.right[1] is
compaction version, high
+ * 32 bit is compaction level, low 32 bit is merge count, pair.right[2] is
tsfile timestamp.
*/
public static Pair<String, long[]> getTsFilePrefixPathAndTsFileVersionPair(
String tsFileAbsolutePath) {