This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch cp_upgrade_commit in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f4e741e67ddf9ae3a64036b9a695989ca07a6a14 Author: Haonan <[email protected]> AuthorDate: Wed Jun 9 13:37:17 2021 +0800 [ISSUE-3378] Fix NPE when clear upgrade folder; Fix some upgraded pageHeader missing statistics (#3376) --- .../engine/storagegroup/StorageGroupProcessor.java | 3 +++ .../iotdb/db/engine/upgrade/UpgradeTask.java | 7 ++++- .../apache/iotdb/db/tools/TsFileRewriteTool.java | 12 +++------ .../iotdb/tsfile/write/chunk/ChunkWriterImpl.java | 31 ++++++++++++++++------ 4 files changed, 35 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index af54491..77d68b1 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -2180,6 +2180,7 @@ public class StorageGroupProcessor { return; } for (TsFileResource resource : resources) { + resource.writeLock(); try { UpgradeUtils.moveUpgradedFiles(resource); tsFileManagement.addAll(resource.getUpgradedResources(), isseq); @@ -2193,6 +2194,8 @@ public class StorageGroupProcessor { resource.getTsFile().getAbsolutePath() + "," + UpgradeCheckStatus.UPGRADE_SUCCESS); } catch (IOException e) { logger.error("Unable to load {}, caused by ", resource, e); + } finally { + resource.writeUnlock(); } } // delete upgrade folder when it is empty diff --git a/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java index 76b1bac..bcf29d8 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java @@ -139,8 +139,13 @@ public class UpgradeTask extends WrappedRunnable { } File virtualStorageGroupDir = fsFactory.getFile(storageGroup, "0"); File upgradeDir = fsFactory.getFile(virtualStorageGroupDir, "upgrade"); - + if (upgradeDir == null) { + continue; + } File[] tmpPartitionDirList = upgradeDir.listFiles(); + if (tmpPartitionDirList == null) { + continue; + } for (File tmpPartitionDir : tmpPartitionDirList) { if (tmpPartitionDir.isDirectory()) { try { diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java index a1c1a2d..c52457b 100644 --- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java +++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java @@ -320,17 +320,12 @@ public class TsFileRewriteTool implements AutoCloseable { List<PageHeader> pageHeadersInChunk = pageHeadersInChunkGroup.get(i); List<Boolean> needToDecodeInfoInChunk = needToDecodeInfoInChunkGroup.get(i); valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType()); - boolean isOnlyOnePageChunk = pageDataInChunk.size() == 1; for (int j = 0; j < pageDataInChunk.size(); j++) { if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(j))) { decodeAndWritePageInToFiles(schema, pageDataInChunk.get(j), chunkWritersInChunkGroup); } else { writePageInToFile( - schema, - pageHeadersInChunk.get(j), - pageDataInChunk.get(j), - chunkWritersInChunkGroup, - isOnlyOnePageChunk); + schema, pageHeadersInChunk.get(j), pageDataInChunk.get(j), chunkWritersInChunkGroup); } } } @@ -390,15 +385,14 @@ public class TsFileRewriteTool implements AutoCloseable { IMeasurementSchema schema, PageHeader pageHeader, ByteBuffer pageData, - Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup, - boolean isOnlyOnePageChunk) + Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup) throws PageException { long partitionId = StorageEngine.getTimePartition(pageHeader.getStartTime()); getOrDefaultTsFileIOWriter(oldTsFile, partitionId); Map<IMeasurementSchema, ChunkWriterImpl> chunkWriters = chunkWritersInChunkGroup.getOrDefault(partitionId, new HashMap<>()); ChunkWriterImpl chunkWriter = chunkWriters.getOrDefault(schema, new ChunkWriterImpl(schema)); - chunkWriter.writePageHeaderAndDataIntoBuff(pageData, pageHeader, isOnlyOnePageChunk); + chunkWriter.writePageHeaderAndDataIntoBuff(pageData, pageHeader); chunkWriters.put(schema, chunkWriter); chunkWritersInChunkGroup.put(partitionId, chunkWriters); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java index 7204a81..e66ed69 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java @@ -299,8 +299,7 @@ public class ChunkWriterImpl implements IChunkWriter { if (numOfPages == 0) { // record the firstPageStatistics this.firstPageStatistics = pageWriter.getStatistics(); this.sizeWithoutStatistic = pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, true); - } else if (numOfPages == 1 - && firstPageStatistics != null) { // put the firstPageStatistics into pageBuffer + } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer byte[] b = pageBuffer.toByteArray(); pageBuffer.reset(); pageBuffer.write(b, 0, this.sizeWithoutStatistic); @@ -379,16 +378,32 @@ public class ChunkWriterImpl implements IChunkWriter { * write the page header and data into the PageWriter's output stream. @NOTE: for upgrading * 0.11/v2 to 0.12/v3 TsFile */ - public void writePageHeaderAndDataIntoBuff( - ByteBuffer data, PageHeader header, boolean isOnlyOnePageChunk) throws PageException { - + public void writePageHeaderAndDataIntoBuff(ByteBuffer data, PageHeader header) + throws PageException { // write the page header to pageBuffer try { logger.debug( "start to flush a page header into buffer, buffer position {} ", pageBuffer.size()); - ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer); - ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer); - if (!isOnlyOnePageChunk) { + // serialize pageHeader see writePageToPageBuffer method + if (numOfPages == 0) { // record the firstPageStatistics + this.firstPageStatistics = header.getStatistics(); + this.sizeWithoutStatistic += + ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer); + this.sizeWithoutStatistic += + ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer); + } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer + byte[] b = pageBuffer.toByteArray(); + pageBuffer.reset(); + pageBuffer.write(b, 0, this.sizeWithoutStatistic); + firstPageStatistics.serialize(pageBuffer); + pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic); + ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer); + ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer); + header.getStatistics().serialize(pageBuffer); + firstPageStatistics = null; + } else { + ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer); + ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer); header.getStatistics().serialize(pageBuffer); } logger.debug(
