This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch move_memtable_plan_index in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4e1676ac1ab6fc852b28bd01150a9bf1720e598f Author: jt <[email protected]> AuthorDate: Mon Dec 14 10:30:03 2020 +0800 move plan index from chunk group footer to a separate marker --- .../iotdb/db/engine/flush/MemTableFlushTask.java | 1 + .../org/apache/iotdb/tsfile/file/MetaMarker.java | 4 +++ .../iotdb/tsfile/file/footer/ChunkGroupFooter.java | 31 ++++------------------ .../iotdb/tsfile/read/TsFileSequenceReader.java | 20 ++++++++++++-- .../iotdb/tsfile/write/writer/TsFileIOWriter.java | 10 ++++++- .../tsfile/read/TsFileSequenceReaderTest.java | 2 +- .../iotdb/tsfile/write/TsFileIOWriterTest.java | 8 ++++++ 7 files changed, 46 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java index ff8d191..11c5303 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java @@ -113,6 +113,7 @@ public class MemTableFlushTask { try { writer.writeVersion(memTable.getVersion()); + writer.writePlanIndices(); } catch (IOException e) { throw new ExecutionException(e); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/MetaMarker.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/MetaMarker.java index 758f0d5..b396a9e 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/MetaMarker.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/MetaMarker.java @@ -30,6 +30,10 @@ public class MetaMarker { public static final byte CHUNK_HEADER = 1; public static final byte SEPARATOR = 2; public static final byte VERSION = 3; + // following this marker are two longs marking the minimum and maximum indices of operations + // involved in the last flushed MemTable, which are generally used to support checkpoint, + // snapshot, or backup. + public static final byte OPERATION_INDEX_RANGE = 4; private MetaMarker() { } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/footer/ChunkGroupFooter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/footer/ChunkGroupFooter.java index 07e5577..cf7181c 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/footer/ChunkGroupFooter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/footer/ChunkGroupFooter.java @@ -38,9 +38,6 @@ public class ChunkGroupFooter { private int numberOfChunks; - private long minPlanIndex; - private long maxPlanIndex; - // this field does not need to be serialized. private int serializedSize; @@ -51,16 +48,13 @@ public class ChunkGroupFooter { * @param dataSize data size * @param numberOfChunks number of chunks */ - public ChunkGroupFooter(String deviceID, long dataSize, int numberOfChunks, long minPlanIndex, - long maxPlanIndex) { + public ChunkGroupFooter(String deviceID, long dataSize, int numberOfChunks) { this.deviceID = deviceID; this.dataSize = dataSize; this.numberOfChunks = numberOfChunks; - this.minPlanIndex = minPlanIndex; - this.maxPlanIndex = maxPlanIndex; this.serializedSize = Byte.BYTES + Integer.BYTES + deviceID.getBytes(TSFileConfig.STRING_CHARSET).length - + Long.BYTES + Integer.BYTES + Long.BYTES + Long.BYTES; + + Long.BYTES + Integer.BYTES; } public static int getSerializedSize(String deviceID) { @@ -89,9 +83,7 @@ public class ChunkGroupFooter { String deviceID = ReadWriteIOUtils.readString(inputStream); long dataSize = ReadWriteIOUtils.readLong(inputStream); int numOfChunks = ReadWriteIOUtils.readInt(inputStream); - long minPlanIndex = ReadWriteIOUtils.readLong(inputStream); - long maxPlanIndex = ReadWriteIOUtils.readLong(inputStream); - return new ChunkGroupFooter(deviceID, dataSize, numOfChunks, minPlanIndex, maxPlanIndex); + return new ChunkGroupFooter(deviceID, dataSize, numOfChunks); } /** @@ -116,9 +108,7 @@ public class ChunkGroupFooter { String deviceID = ReadWriteIOUtils.readStringWithLength(buffer, size); long dataSize = ReadWriteIOUtils.readLong(buffer); int numOfChunks = ReadWriteIOUtils.readInt(buffer); - long minPlanIndex = ReadWriteIOUtils.readLong(buffer); - long maxPlanIndex = ReadWriteIOUtils.readLong(buffer); - return new ChunkGroupFooter(deviceID, dataSize, numOfChunks, minPlanIndex, maxPlanIndex); + return new ChunkGroupFooter(deviceID, dataSize, numOfChunks); } public int getSerializedSize() { @@ -154,8 +144,6 @@ public class ChunkGroupFooter { length += ReadWriteIOUtils.write(deviceID, outputStream); length += ReadWriteIOUtils.write(dataSize, outputStream); length += ReadWriteIOUtils.write(numberOfChunks, outputStream); - length += ReadWriteIOUtils.write(minPlanIndex, outputStream); - length += ReadWriteIOUtils.write(maxPlanIndex, outputStream); return length; } @@ -163,15 +151,6 @@ public class ChunkGroupFooter { public String toString() { return "CHUNK_GROUP_FOOTER{" + "deviceID='" + deviceID + '\'' + ", dataSize=" + dataSize + ", numberOfChunks=" - + numberOfChunks + ", serializedSize=" + serializedSize + ", logIndex=[" + minPlanIndex - + "," + maxPlanIndex + "]}"; - } - - public long getMinPlanIndex() { - return minPlanIndex; - } - - public long getMaxPlanIndex() { - return maxPlanIndex; + + numberOfChunks + ", serializedSize=" + serializedSize; } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java index f6d8f7c..6bdaaf2 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java @@ -680,6 +680,21 @@ public class TsFileSequenceReader implements AutoCloseable { return buffer.getLong(); } + public void readPlanIndex() throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) == 0) { + throw new IOException("reach the end of the file."); + } + buffer.flip(); + minPlanIndex = buffer.getLong(); + buffer.clear(); + if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) == 0) { + throw new IOException("reach the end of the file."); + } + buffer.flip(); + maxPlanIndex = buffer.getLong(); + } + /** * read data from current position of the input, and deserialize it to a CHUNK_HEADER. <br> This * method is not threadsafe. @@ -965,8 +980,6 @@ public class TsFileSequenceReader implements AutoCloseable { // if there is something wrong with the ChunkGroup Footer, we will drop this ChunkGroup // because we can not guarantee the correctness of the deviceId. ChunkGroupFooter chunkGroupFooter = this.readChunkGroupFooter(); - minPlanIndex = Math.min(minPlanIndex, chunkGroupFooter.getMinPlanIndex()); - maxPlanIndex = Math.max(maxPlanIndex, chunkGroupFooter.getMaxPlanIndex()); deviceID = chunkGroupFooter.getDeviceID(); if (newSchema != null) { for (MeasurementSchema tsSchema : measurementSchemaList) { @@ -986,6 +999,9 @@ public class TsFileSequenceReader implements AutoCloseable { versionInfo.add(new Pair<>(position(), version)); truncatedSize = this.position(); break; + case MetaMarker.OPERATION_INDEX_RANGE: + readPlanIndex(); + break; default: // the disk file is corrupted, using this file may be dangerous throw new IOException("Unexpected marker " + marker); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java index 6afe6ac..0a77706 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java @@ -91,6 +91,8 @@ public class TsFileIOWriter { // for upgrade tool Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap; + // the two longs marks the index range of operations in current MemTable + // and are serialized after MetaMarker.OPERATION_INDEX_RANGE to recover file-level range private long minPlanIndex; private long maxPlanIndex; @@ -160,7 +162,7 @@ public class TsFileIOWriter { } long dataSize = out.getPosition() - currentChunkGroupStartOffset; ChunkGroupFooter chunkGroupFooter = new ChunkGroupFooter(currentChunkGroupDeviceId, dataSize, - chunkMetadataList.size(), minPlanIndex, maxPlanIndex); + chunkMetadataList.size()); chunkGroupFooter.serializeTo(out.wrapAsStream()); chunkGroupMetadataList .add(new ChunkGroupMetadata(currentChunkGroupDeviceId, chunkMetadataList)); @@ -428,6 +430,12 @@ public class TsFileIOWriter { versionInfo.add(new Pair<>(getPos(), version)); } + public void writePlanIndices() throws IOException { + ReadWriteIOUtils.write(MetaMarker.OPERATION_INDEX_RANGE, out.wrapAsStream()); + ReadWriteIOUtils.write(minPlanIndex, out.wrapAsStream()); + ReadWriteIOUtils.write(maxPlanIndex, out.wrapAsStream()); + } + public void setDefaultVersionPair() { // only happen when using tsfile module write api if (versionInfo.isEmpty()) { diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTest.java index 768d7ea..b8623f7 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTest.java @@ -62,7 +62,7 @@ public class TsFileSequenceReaderTest { } @Test - public void testReadTsFileSequently() throws IOException { + public void testReadTsFileSequentially() throws IOException { TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH); reader.position(TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER .getBytes().length); diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java index 61e4786..c6050dd 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java @@ -66,6 +66,9 @@ public class TsFileIOWriterTest { writer.endChunkGroup(); writer.writeVersion(0L); + writer.setMinPlanIndex(100); + writer.setMaxPlanIndex(10000); + writer.writePlanIndices(); // end file writer.endFile(); } @@ -104,6 +107,11 @@ public class TsFileIOWriterTest { reader.readVersion(); + Assert.assertEquals(MetaMarker.OPERATION_INDEX_RANGE, reader.readMarker()); + reader.readPlanIndex(); + Assert.assertEquals(100, reader.getMinPlanIndex()); + Assert.assertEquals(10000, reader.getMaxPlanIndex()); + Assert.assertEquals(MetaMarker.SEPARATOR, reader.readMarker()); // FileMetaData
