HTHou commented on a change in pull request #983:
URL: https://github.com/apache/incubator-iotdb/pull/983#discussion_r422661924



##########
File path: 
server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsfileOnlineUpgradeTool.java
##########
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.tools.upgrade;
+
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.exception.write.PageException;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.OldChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.OldChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.OldTsDeviceMetadata;
+import org.apache.iotdb.tsfile.file.metadata.OldTsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.file.metadata.OldTsFileMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class TsfileOnlineUpgradeTool implements AutoCloseable {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(TsfileOnlineUpgradeTool.class);
+
+  private TsFileInput tsFileInput;
+  private long fileMetadataPos;
+  private int fileMetadataSize;
+  private ByteBuffer markerBuffer = ByteBuffer.allocate(Byte.BYTES);
+  protected String file;
+  
+  // PartitionId -> TsFileIOWriter 
+  private Map<Long, TsFileIOWriter> partitionWriterMap;
+
+  /**
+   * Create a file reader of the given file. The reader will read the tail of 
the file to get the
+   * file metadata size.Then the reader will skip the first 
TSFileConfig.OLD_MAGIC_STRING.length()
+   * bytes of the file for preparing reading real data.
+   *
+   * @param file the data file
+   * @throws IOException If some I/O error occurs
+   */
+  public TsfileOnlineUpgradeTool(String file) throws IOException {
+    this(file, true);
+  }
+
+  /**
+   * construct function for TsfileOnlineUpgradeTool.
+   *
+   * @param file -given file name
+   * @param loadMetadataSize -load meta data size
+   */
+  public TsfileOnlineUpgradeTool(String file, boolean loadMetadataSize) throws 
IOException {
+    this.file = file;
+    final java.nio.file.Path path = Paths.get(file);
+    tsFileInput = new LocalTsFileInput(path);
+    partitionWriterMap = new HashMap<>();
+    try {
+      if (loadMetadataSize) {
+        loadMetadataSize();
+      }
+    } catch (Exception e) {
+      tsFileInput.close();
+      throw e;
+    }
+  }
+
+  /**
+   * 
+   */
+  public void loadMetadataSize() throws IOException {
+    ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES);
+    tsFileInput.read(metadataSize,
+        tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - 
Integer.BYTES);
+    metadataSize.flip();
+    // read file metadata size and position
+    fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize);
+    fileMetadataPos =
+        tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - 
Integer.BYTES
+            - fileMetadataSize;
+    // skip the magic header
+    position(TSFileConfig.MAGIC_STRING.length());
+  }
+
+  public String readTailMagic() throws IOException {
+    long totalSize = tsFileInput.size();
+
+    ByteBuffer magicStringBytes = 
ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.length());
+    tsFileInput.read(magicStringBytes, totalSize - 
TSFileConfig.MAGIC_STRING.length());
+    magicStringBytes.flip();
+    return new String(magicStringBytes.array());
+  }
+
+  /**
+   * whether the file is a complete TsFile: only if the head magic and tail 
magic string exists.
+   */
+  public boolean isComplete() throws IOException {
+    return tsFileInput.size() >= TSFileConfig.MAGIC_STRING.length() * 2 && 
readTailMagic()
+        .equals(readHeadMagic());
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public String readHeadMagic() throws IOException {
+    return readHeadMagic(false);
+  }
+
+  /**
+   * @param movePosition whether move the position of the file reader after 
reading the magic header
+   * to the end of the magic head string.
+   */
+  public String readHeadMagic(boolean movePosition) throws IOException {
+    ByteBuffer magicStringBytes = 
ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.length());
+    if (movePosition) {
+      tsFileInput.position(0);
+      tsFileInput.read(magicStringBytes);
+    } else {
+      tsFileInput.read(magicStringBytes, 0);
+    }
+    magicStringBytes.flip();
+    return new String(magicStringBytes.array());
+  }
+
+  /**
+   * this function reads version number and checks compatibility of TsFile.
+   */
+  public String readVersionNumber() throws IOException {
+    ByteBuffer versionNumberBytes = ByteBuffer
+        .allocate(TSFileConfig.VERSION_NUMBER.getBytes().length);
+    tsFileInput.position(TSFileConfig.MAGIC_STRING.getBytes().length);
+    tsFileInput.read(versionNumberBytes);
+    versionNumberBytes.flip();
+    return new String(versionNumberBytes.array());
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public OldTsFileMetadata readFileMetadata() throws IOException {
+    return OldTsFileMetadata.deserializeFrom(readData(fileMetadataPos, 
fileMetadataSize));
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public OldTsDeviceMetadata readTsDeviceMetaData(OldTsDeviceMetadataIndex 
index) throws IOException {
+    return OldTsDeviceMetadata.deserializeFrom(readData(index.getOffset(), 
index.getLen()));
+  }
+
+  /**
+   * read data from current position of the input, and deserialize it to a 
CHUNK_GROUP_FOOTER. <br>
+   * This method is not threadsafe.
+   *
+   * @return a CHUNK_GROUP_FOOTER
+   * @throws IOException io error
+   */
+  public ChunkGroupFooter readChunkGroupFooter() throws IOException {
+    return ChunkGroupFooter.deserializeFrom(tsFileInput.wrapAsInputStream(), 
true);
+  }
+
+  /**
+   * read data from current position of the input, and deserialize it to a 
CHUNK_HEADER. <br> This
+   * method is not threadsafe.
+   *
+   * @return a CHUNK_HEADER
+   * @throws IOException io error
+   */
+  public ChunkHeader readChunkHeader() throws IOException {
+    return ChunkHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), true, 
true);
+  }
+
+  /**
+   * not thread safe.
+   *
+   * @param type given tsfile data type
+   */
+  public PageHeader readPageHeader(TSDataType type) throws IOException {
+    return PageHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), type, 
true);
+  }
+
+  public ByteBuffer readPage(PageHeader header, CompressionType type)
+      throws IOException {
+    ByteBuffer buffer = readData(-1, header.getCompressedSize());
+    IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type);
+    ByteBuffer uncompressedBuffer = 
ByteBuffer.allocate(header.getUncompressedSize());
+    if (type == CompressionType.UNCOMPRESSED) {
+      return buffer;
+    }
+    unCompressor.uncompress(buffer.array(), buffer.position(), 
buffer.remaining(),
+        uncompressedBuffer.array(),
+        0);
+    return uncompressedBuffer;
+  }
+  
+  public ByteBuffer readCompressedPage(PageHeader header) throws IOException {
+    return readData(-1, header.getCompressedSize());
+  }
+
+  public long position() throws IOException {
+    return tsFileInput.position();
+  }
+
+  public void position(long offset) throws IOException {
+    tsFileInput.position(offset);
+  }
+
+  /**
+   * read one byte from the input. <br> this method is not thread safe
+   */
+  public byte readMarker() throws IOException {
+    markerBuffer.clear();
+    if (ReadWriteIOUtils.readAsPossible(tsFileInput, markerBuffer) == 0) {
+      throw new IOException("reach the end of the file.");
+    }
+    markerBuffer.flip();
+    return markerBuffer.get();
+  }
+
+  public byte readMarker(long position) throws IOException {
+    return readData(position, Byte.BYTES).get();
+  }
+
+  public void close() throws IOException {
+    this.tsFileInput.close();
+  }
+
+  public String getFileName() {
+    return this.file;
+  }
+
+  /**
+   * read data from tsFileInput, from the current position (if position = -1), 
or the given
+   * position. <br> if position = -1, the tsFileInput's position will be 
changed to the current
+   * position + real data size that been read. Other wise, the tsFileInput's 
position is not
+   * changed.
+   *
+   * @param position the start position of data in the tsFileInput, or the 
current position if
+   * position = -1
+   * @param size the size of data that want to read
+   * @return data that been read.
+   */
+  private ByteBuffer readData(long position, int size) throws IOException {
+    ByteBuffer buffer = ByteBuffer.allocate(size);
+    if (position == -1) {
+      if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) != size) {
+        throw new IOException("reach the end of the data");
+      }
+    } else {
+      if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size) 
!= size) {
+        throw new IOException("reach the end of the data");
+      }
+    }
+    buffer.flip();
+    return buffer;
+  }
+
+  /**
+   * upgrade file and resource, return the boolean value whether upgrade task 
completes
+   * @throws IOException, WriteProcessException 
+   */
+  public boolean upgradeFile(List<TsFileResource> upgradedResources) 
+      throws IOException, WriteProcessException {
+    File oldTsFile = FSFactoryProducer.getFSFactory().getFile(this.file);
+
+    // check if the old TsFile has correct header 
+    if (!fileCheck(oldTsFile)) {
+      return false;
+    }
+
+    // ChunkGroupOffset -> version
+    Map<Long, Long> oldVersionInfo = new HashMap<>();

Review comment:
       Can you please the reason? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to