This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-4251
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/IOTDB-4251 by this push:
     new adecc618e5 temp for external sort
adecc618e5 is described below

commit adecc618e52ee63601bd174bfb56fbba6010b6d9
Author: LiuXuxin <[email protected]>
AuthorDate: Mon Sep 5 17:54:27 2022 +0800

    temp for external sort
---
 .../write/writer/MemoryControlTsFileIOWriter.java  | 296 ++++++++++++++++-----
 .../writer/MemoryControlTsFileIOWriterTest.java    |  25 +-
 2 files changed, 247 insertions(+), 74 deletions(-)

diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
index 5a63121295..e84970ced1 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,23 +37,25 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 
 public class MemoryControlTsFileIOWriter extends TsFileIOWriter {
   private static final Logger LOG = 
LoggerFactory.getLogger(MemoryControlTsFileIOWriter.class);
-  private long maxMetadataSize;
-  private long currentChunkMetadataSize = 0L;
-  private File chunkMetadataTempFile;
+  protected long maxMetadataSize;
+  protected long currentChunkMetadataSize = 0L;
+  protected File chunkMetadataTempFile;
   protected LocalTsFileOutput tempOutput;
-  protected LocalTsFileInput tempInput;
-  private final boolean needSort;
-  private List<Long> sortedSegmentPosition = new ArrayList<>();
-  private ByteBuffer typeBuffer = ByteBuffer.allocate(1);
-  private ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
+  protected final boolean needSort;
+  protected Queue<Long> sortedSegmentPosition = new ArrayDeque<>();
 
   public static final String CHUNK_METADATA_TEMP_FILE_PREFIX = ".cmt";
+  private static final String SORTING_TEMP_FILE = ".scmt";
   private static final byte VECTOR_TYPE = 1;
   private static final byte NORMAL_TYPE = 2;
 
@@ -69,7 +72,6 @@ public class MemoryControlTsFileIOWriter extends 
TsFileIOWriter {
     currentChunkMetadataSize += currentChunkMetadata.calculateRamSize();
     super.endCurrentChunk();
     if (currentChunkMetadataSize > maxMetadataSize) {
-      // TODO: Sort and flush the chunk metadata to outside
       try {
         sortAndFlushChunkMetadata();
       } catch (IOException e) {
@@ -90,100 +92,252 @@ public class MemoryControlTsFileIOWriter extends 
TsFileIOWriter {
     for (Map.Entry<Path, List<IChunkMetadata>> entry : 
chunkMetadataListMap.entrySet()) {
       Path seriesPath = entry.getKey();
       List<IChunkMetadata> iChunkMetadataList = entry.getValue();
-      if (iChunkMetadataList.size() > 0
-          && iChunkMetadataList.get(0).getDataType() == TSDataType.VECTOR) {
-        // this is a vector chunk
-        writeAlignedChunkMetadata(iChunkMetadataList, seriesPath);
-      } else {
-        writeNormalChunkMetadata(iChunkMetadataList, seriesPath);
-      }
+      writeChunkMetadata(iChunkMetadataList, seriesPath, tempOutput);
     }
   }
 
-  private void writeAlignedChunkMetadata(List<IChunkMetadata> 
iChunkMetadataList, Path seriesPath)
+  private void writeChunkMetadata(
+      List<IChunkMetadata> iChunkMetadataList, Path seriesPath, 
LocalTsFileOutput output)
       throws IOException {
-    ReadWriteIOUtils.write(VECTOR_TYPE, tempOutput);
-    IChunkMetadata currentTimeChunk = iChunkMetadataList.get(0);
-    List<IChunkMetadata> currentValueChunk = new ArrayList<>();
-    List<AlignedChunkMetadata> alignedChunkMetadata = new ArrayList<>();
-    for (int i = 1; i < iChunkMetadataList.size(); ++i) {
-      if (iChunkMetadataList.get(i).getDataType() == TSDataType.VECTOR) {
+    if (iChunkMetadataList.size() == 0) {
+      return;
+    }
+    if (iChunkMetadataList.get(0).getDataType() == TSDataType.VECTOR) {
+      IChunkMetadata currentTimeChunk = iChunkMetadataList.get(0);
+      List<IChunkMetadata> currentValueChunk = new ArrayList<>();
+      List<IChunkMetadata> alignedChunkMetadata = new ArrayList<>();
+      for (int i = 1; i < iChunkMetadataList.size(); ++i) {
+        if (iChunkMetadataList.get(i).getDataType() == TSDataType.VECTOR) {
+          alignedChunkMetadata.add(new AlignedChunkMetadata(currentTimeChunk, 
currentValueChunk));
+          currentTimeChunk = iChunkMetadataList.get(i);
+          currentValueChunk = new ArrayList<>();
+        } else {
+          currentValueChunk.add(iChunkMetadataList.get(i));
+        }
+      }
+      if (currentValueChunk.size() > 0) {
         alignedChunkMetadata.add(new AlignedChunkMetadata(currentTimeChunk, 
currentValueChunk));
-        currentTimeChunk = iChunkMetadataList.get(i);
-        currentValueChunk = new ArrayList<>();
-      } else {
-        currentValueChunk.add(iChunkMetadataList.get(i));
       }
+      writeAlignedChunkMetadata(alignedChunkMetadata, seriesPath, output);
+    } else {
+      writeNormalChunkMetadata(iChunkMetadataList, seriesPath, output);
     }
-    if (currentValueChunk.size() > 0) {
-      alignedChunkMetadata.add(new AlignedChunkMetadata(currentTimeChunk, 
currentValueChunk));
-    }
-    for (IChunkMetadata chunkMetadata : alignedChunkMetadata) {
+  }
+
+  private void writeAlignedChunkMetadata(
+      List<IChunkMetadata> iChunkMetadataList, Path seriesPath, 
LocalTsFileOutput output)
+      throws IOException {
+    ReadWriteIOUtils.write(VECTOR_TYPE, output);
+    for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
       PublicBAOS buffer = new PublicBAOS();
       int size = chunkMetadata.serializeWithFullInfo(buffer, 
seriesPath.getDevice());
-      ReadWriteIOUtils.write(size, tempOutput);
-      buffer.writeTo(tempOutput);
+      ReadWriteIOUtils.write(size, output);
+      buffer.writeTo(output);
     }
   }
 
-  private void writeNormalChunkMetadata(List<IChunkMetadata> 
iChunkMetadataList, Path seriesPath)
+  private void writeNormalChunkMetadata(
+      List<IChunkMetadata> iChunkMetadataList, Path seriesPath, 
LocalTsFileOutput output)
       throws IOException {
-    ReadWriteIOUtils.write(NORMAL_TYPE, tempOutput);
+    ReadWriteIOUtils.write(NORMAL_TYPE, output);
     for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
       PublicBAOS buffer = new PublicBAOS();
       int size = chunkMetadata.serializeWithFullInfo(buffer, 
seriesPath.getFullPath());
-      ReadWriteIOUtils.write(size, tempOutput);
-      buffer.writeTo(tempOutput);
+      ReadWriteIOUtils.write(size, output);
+      buffer.writeTo(output);
     }
   }
 
-  protected Pair<String, IChunkMetadata> readNextChunkMetadata() throws 
IOException {
-    if (tempInput == null) {
-      tempInput = new LocalTsFileInput(chunkMetadataTempFile.toPath());
-    }
-    byte type = readNextChunkMetadataType();
-    int size = readNextChunkMetadataSize();
-    ByteBuffer chunkBuffer = ByteBuffer.allocate(size);
-    ReadWriteIOUtils.readAsPossible(tempInput, chunkBuffer);
-    chunkBuffer.flip();
-    if (type == NORMAL_TYPE) {
-      ChunkMetadata chunkMetadata = new ChunkMetadata();
-      String seriesPath = ChunkMetadata.deserializeWithFullInfo(chunkBuffer, 
chunkMetadata);
-      return new Pair<>(seriesPath, chunkMetadata);
+  @Override
+  public void endFile() throws IOException {
+    if (this.sortedSegmentPosition.size() > 0) {
+      // there is some chunk metadata already been written to the disk
+      sortAndFlushChunkMetadata();
+      tempOutput.close();
     } else {
-      AlignedChunkMetadata chunkMetadata = new AlignedChunkMetadata();
-      String devicePath = 
AlignedChunkMetadata.deserializeWithFullInfo(chunkBuffer, chunkMetadata);
-      return new Pair<>(devicePath, chunkMetadata);
+      // sort the chunk metadata in memory, and just close the file
+      tempOutput.close();
+      super.endFile();
+      return;
     }
-  }
 
-  private byte readNextChunkMetadataType() throws IOException {
-    typeBuffer.clear();
-    ReadWriteIOUtils.readAsPossible(tempInput, typeBuffer);
-    typeBuffer.flip();
-    return ReadWriteIOUtils.readByte(typeBuffer);
-  }
+    if (needSort) {
+      externalSort();
+    }
 
-  private int readNextChunkMetadataSize() throws IOException {
-    sizeBuffer.clear();
-    ReadWriteIOUtils.readAsPossible(tempInput, sizeBuffer);
-    sizeBuffer.flip();
-    return ReadWriteIOUtils.readInt(sizeBuffer);
+    //         super.endFile();
   }
 
-  @Override
-  public void endFile() {
-    //         super.endFile();
+  protected void externalSort() throws IOException {
+    ChunkMetadataComparator comparator = new ChunkMetadataComparator();
+    int totalSegmentCount = this.sortedSegmentPosition.size();
+    File currentInFile = this.chunkMetadataTempFile;
+    File currentOutFile = new File(this.file.getAbsolutePath() + 
SORTING_TEMP_FILE);
+    LocalTsFileInput inputForWindow1 = null;
+    LocalTsFileInput inputForWindow2 = null;
+    LocalTsFileOutput output = null;
+    while (totalSegmentCount > 1) {
+      try {
+        inputForWindow1 = new LocalTsFileInput(currentInFile.toPath());
+        inputForWindow2 = new LocalTsFileInput(currentInFile.toPath());
+        output = new LocalTsFileOutput(new FileOutputStream(currentOutFile));
+        totalSegmentCount = 0;
+        Queue<Long> newSortedSegmentPosition = new ArrayDeque<>();
+        while (sortedSegmentPosition.size() > 0) {
+          long startPositionForWindow1 = sortedSegmentPosition.poll();
+          if (sortedSegmentPosition.size() == 0) {
+            // Just leave it alone, and record the position
+            newSortedSegmentPosition.add(startPositionForWindow1);
+            continue;
+          }
+          long startPositionForWindow2 = sortedSegmentPosition.poll();
+          ChunkMetadataExternalSortWindow firstWindow =
+              new ChunkMetadataExternalSortWindow(
+                  startPositionForWindow1, startPositionForWindow2, 
inputForWindow1);
+          ChunkMetadataExternalSortWindow secondWindow =
+              new ChunkMetadataExternalSortWindow(
+                  startPositionForWindow2,
+                  sortedSegmentPosition.size() > 0
+                      ? sortedSegmentPosition.element()
+                      : this.chunkMetadataTempFile.length(),
+                  inputForWindow2);
+          firstWindow.getNextSeriesNameAndChunkMetadata();
+          secondWindow.getNextSeriesNameAndChunkMetadata();
+          newSortedSegmentPosition.add(output.getPosition());
+          while (firstWindow.hasNextChunkMetadata() && 
secondWindow.hasNextChunkMetadata()) {
+            Pair<String, IChunkMetadata> pairOfFirstWindow =
+                firstWindow.getCurrentSeriesNameAndChunkMetadata();
+            Pair<String, IChunkMetadata> pairOfSecondWindow =
+                secondWindow.getCurrentSeriesNameAndChunkMetadata();
+            Pair<String, IChunkMetadata> pairToWritten = null;
+            if (comparator.compare(pairOfFirstWindow, pairOfSecondWindow) < 0) 
{
+              pairToWritten = pairOfFirstWindow;
+              if (firstWindow.hasNextChunkMetadata()) {
+                firstWindow.getNextSeriesNameAndChunkMetadata();
+              }
+            } else {
+              pairToWritten = pairOfSecondWindow;
+              if (secondWindow.hasNextChunkMetadata()) {
+                secondWindow.getNextSeriesNameAndChunkMetadata();
+              }
+            }
+            // serialize the chunk to the output
+            if (pairToWritten.right instanceof AlignedChunkMetadata) {
+              writeAlignedChunkMetadata(
+                  Collections.singletonList(pairToWritten.right),
+                  new Path(pairToWritten.left),
+                  output);
+            } else {
+              writeNormalChunkMetadata(
+                  Collections.singletonList(pairToWritten.right),
+                  new Path(pairToWritten.left),
+                  output);
+            }
+          }
+        }
+
+        output.close();
+        inputForWindow1.close();
+        inputForWindow2.close();
+        FileUtils.delete(currentInFile);
+        currentOutFile.renameTo(currentInFile);
+        File tempFile = currentOutFile;
+        currentOutFile = currentInFile;
+        currentInFile = tempFile;
+      } finally {
+        if (inputForWindow1 != null) {
+          inputForWindow1.close();
+        }
+        if (inputForWindow2 != null) {
+          inputForWindow2.close();
+        }
+        if (output != null) {
+          output.close();
+        }
+      }
+    }
   }
 
   @Override
   public void close() throws IOException {
     super.close();
-    if (tempInput != null) {
-      tempInput.close();
-    }
     if (tempOutput != null) {
       this.tempOutput.close();
     }
   }
+
+  protected static class ChunkMetadataComparator
+      implements Comparator<Pair<String, IChunkMetadata>> {
+
+    @Override
+    public int compare(Pair<String, IChunkMetadata> o1, Pair<String, 
IChunkMetadata> o2) {
+      String seriesNameOfO1 = o1.left;
+      String seriesNameOfO2 = o2.left;
+      int lexicographicalOrder = seriesNameOfO1.compareTo(seriesNameOfO2);
+      if (lexicographicalOrder != 0) {
+        return lexicographicalOrder;
+      } else {
+        return Long.compare(o1.right.getStartTime(), o2.right.getStartTime());
+      }
+    }
+  }
+
+  protected class ChunkMetadataExternalSortWindow {
+
+    final LocalTsFileInput input;
+    final long startPosition;
+    final long endPosition;
+    final ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
+    final ByteBuffer typeBuffer = ByteBuffer.allocate(1);
+    Pair<String, IChunkMetadata> currentPair = null;
+
+    ChunkMetadataExternalSortWindow(long startPosition, long endPosition, 
LocalTsFileInput input)
+        throws IOException {
+      this.startPosition = startPosition;
+      this.endPosition = endPosition;
+      this.input = input;
+      this.input.position(startPosition);
+    }
+
+    public boolean hasNextChunkMetadata() throws IOException {
+      return currentPair != null || this.input.position() < endPosition;
+    }
+
+    public Pair<String, IChunkMetadata> getNextSeriesNameAndChunkMetadata() 
throws IOException {
+      byte type = readNextChunkMetadataType();
+      int size = readNextChunkMetadataSize();
+      ByteBuffer chunkBuffer = ByteBuffer.allocate(size);
+      ReadWriteIOUtils.readAsPossible(input, chunkBuffer);
+      chunkBuffer.flip();
+      if (type == NORMAL_TYPE) {
+        ChunkMetadata chunkMetadata = new ChunkMetadata();
+        String seriesPath = ChunkMetadata.deserializeWithFullInfo(chunkBuffer, 
chunkMetadata);
+        currentPair = new Pair<>(seriesPath, chunkMetadata);
+      } else {
+        AlignedChunkMetadata chunkMetadata = new AlignedChunkMetadata();
+        String devicePath =
+            AlignedChunkMetadata.deserializeWithFullInfo(chunkBuffer, 
chunkMetadata);
+        currentPair = new Pair<>(devicePath, chunkMetadata);
+      }
+      return currentPair;
+    }
+
+    public Pair<String, IChunkMetadata> getCurrentSeriesNameAndChunkMetadata() 
{
+      return currentPair;
+    }
+
+    private byte readNextChunkMetadataType() throws IOException {
+      typeBuffer.clear();
+      ReadWriteIOUtils.readAsPossible(input, typeBuffer);
+      typeBuffer.flip();
+      return ReadWriteIOUtils.readByte(typeBuffer);
+    }
+
+    private int readNextChunkMetadataSize() throws IOException {
+      sizeBuffer.clear();
+      ReadWriteIOUtils.readAsPossible(input, sizeBuffer);
+      sizeBuffer.flip();
+      return ReadWriteIOUtils.readInt(sizeBuffer);
+    }
+  }
 }
diff --git 
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriterTest.java
 
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriterTest.java
index e894901a0a..25bd192974 100644
--- 
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriterTest.java
+++ 
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriterTest.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -100,8 +101,14 @@ public class MemoryControlTsFileIOWriterTest extends 
MemoryControlTsFileIOWriter
       writer.sortAndFlushChunkMetadata();
       writer.tempOutput.flush();
 
+      ChunkMetadataExternalSortWindow window =
+          writer
+          .new ChunkMetadataExternalSortWindow(
+              0,
+              writer.chunkMetadataTempFile.length(),
+              new LocalTsFileInput(writer.chunkMetadataTempFile.toPath()));
       for (int i = 0; i < originChunkMetadataList.size(); ++i) {
-        Pair<String, IChunkMetadata> chunkMetadataPair = 
writer.readNextChunkMetadata();
+        Pair<String, IChunkMetadata> chunkMetadataPair = 
window.getNextSeriesNameAndChunkMetadata();
         Assert.assertEquals("root.sg.d" + i / 5 + ".s" + i % 5, 
chunkMetadataPair.left);
         Assert.assertEquals(
             originChunkMetadataList.get(i).getStartTime(), 
chunkMetadataPair.right.getStartTime());
@@ -150,8 +157,14 @@ public class MemoryControlTsFileIOWriterTest extends 
MemoryControlTsFileIOWriter
             new AlignedChunkMetadata(currentTimeChunkMetadata, 
currentValueChunkMetadata));
       }
 
+      ChunkMetadataExternalSortWindow window =
+          writer
+          .new ChunkMetadataExternalSortWindow(
+              0,
+              writer.chunkMetadataTempFile.length(),
+              new LocalTsFileInput(writer.chunkMetadataTempFile.toPath()));
       for (int i = 0; i < alignedChunkMetadata.size(); ++i) {
-        Pair<String, IChunkMetadata> chunkMetadataPair = 
writer.readNextChunkMetadata();
+        Pair<String, IChunkMetadata> chunkMetadataPair = 
window.getNextSeriesNameAndChunkMetadata();
         Assert.assertEquals("root.sg.d" + i, chunkMetadataPair.left);
         Assert.assertEquals(
             alignedChunkMetadata.get(i).getStartTime(), 
chunkMetadataPair.right.getStartTime());
@@ -213,8 +226,14 @@ public class MemoryControlTsFileIOWriterTest extends 
MemoryControlTsFileIOWriter
       writer.sortAndFlushChunkMetadata();
       writer.tempOutput.flush();
 
+      ChunkMetadataExternalSortWindow window =
+          writer
+          .new ChunkMetadataExternalSortWindow(
+              0,
+              writer.chunkMetadataTempFile.length(),
+              new LocalTsFileInput(writer.chunkMetadataTempFile.toPath()));
       for (int i = 0, deviceCnt = 0; i < originChunkMetadataList.size(); ++i) {
-        Pair<String, IChunkMetadata> chunkMetadataPair = 
writer.readNextChunkMetadata();
+        Pair<String, IChunkMetadata> chunkMetadataPair = 
window.getNextSeriesNameAndChunkMetadata();
         if (originChunkMetadataList.get(i) instanceof ChunkMetadata) {
           Assert.assertEquals(
               "root.sg.d" + deviceCnt + "." + 
originChunkMetadataList.get(i).getMeasurementUid(),

Reply via email to