This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch IOTDB-4251
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/IOTDB-4251 by this push:
new adecc618e5 temp for external sort
adecc618e5 is described below
commit adecc618e52ee63601bd174bfb56fbba6010b6d9
Author: LiuXuxin <[email protected]>
AuthorDate: Mon Sep 5 17:54:27 2022 +0800
temp for external sort
---
.../write/writer/MemoryControlTsFileIOWriter.java | 296 ++++++++++++++++-----
.../writer/MemoryControlTsFileIOWriterTest.java | 25 +-
2 files changed, 247 insertions(+), 74 deletions(-)
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
index 5a63121295..e84970ced1 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,23 +37,25 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
public class MemoryControlTsFileIOWriter extends TsFileIOWriter {
private static final Logger LOG =
LoggerFactory.getLogger(MemoryControlTsFileIOWriter.class);
- private long maxMetadataSize;
- private long currentChunkMetadataSize = 0L;
- private File chunkMetadataTempFile;
+ protected long maxMetadataSize;
+ protected long currentChunkMetadataSize = 0L;
+ protected File chunkMetadataTempFile;
protected LocalTsFileOutput tempOutput;
- protected LocalTsFileInput tempInput;
- private final boolean needSort;
- private List<Long> sortedSegmentPosition = new ArrayList<>();
- private ByteBuffer typeBuffer = ByteBuffer.allocate(1);
- private ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
+ protected final boolean needSort;
+ protected Queue<Long> sortedSegmentPosition = new ArrayDeque<>();
public static final String CHUNK_METADATA_TEMP_FILE_PREFIX = ".cmt";
+ private static final String SORTING_TEMP_FILE = ".scmt";
private static final byte VECTOR_TYPE = 1;
private static final byte NORMAL_TYPE = 2;
@@ -69,7 +72,6 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
currentChunkMetadataSize += currentChunkMetadata.calculateRamSize();
super.endCurrentChunk();
if (currentChunkMetadataSize > maxMetadataSize) {
- // TODO: Sort and flush the chunk metadata to outside
try {
sortAndFlushChunkMetadata();
} catch (IOException e) {
@@ -90,100 +92,252 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
for (Map.Entry<Path, List<IChunkMetadata>> entry :
chunkMetadataListMap.entrySet()) {
Path seriesPath = entry.getKey();
List<IChunkMetadata> iChunkMetadataList = entry.getValue();
- if (iChunkMetadataList.size() > 0
- && iChunkMetadataList.get(0).getDataType() == TSDataType.VECTOR) {
- // this is a vector chunk
- writeAlignedChunkMetadata(iChunkMetadataList, seriesPath);
- } else {
- writeNormalChunkMetadata(iChunkMetadataList, seriesPath);
- }
+ writeChunkMetadata(iChunkMetadataList, seriesPath, tempOutput);
}
}
- private void writeAlignedChunkMetadata(List<IChunkMetadata>
iChunkMetadataList, Path seriesPath)
+ private void writeChunkMetadata(
+ List<IChunkMetadata> iChunkMetadataList, Path seriesPath,
LocalTsFileOutput output)
throws IOException {
- ReadWriteIOUtils.write(VECTOR_TYPE, tempOutput);
- IChunkMetadata currentTimeChunk = iChunkMetadataList.get(0);
- List<IChunkMetadata> currentValueChunk = new ArrayList<>();
- List<AlignedChunkMetadata> alignedChunkMetadata = new ArrayList<>();
- for (int i = 1; i < iChunkMetadataList.size(); ++i) {
- if (iChunkMetadataList.get(i).getDataType() == TSDataType.VECTOR) {
+ if (iChunkMetadataList.size() == 0) {
+ return;
+ }
+ if (iChunkMetadataList.get(0).getDataType() == TSDataType.VECTOR) {
+ IChunkMetadata currentTimeChunk = iChunkMetadataList.get(0);
+ List<IChunkMetadata> currentValueChunk = new ArrayList<>();
+ List<IChunkMetadata> alignedChunkMetadata = new ArrayList<>();
+ for (int i = 1; i < iChunkMetadataList.size(); ++i) {
+ if (iChunkMetadataList.get(i).getDataType() == TSDataType.VECTOR) {
+ alignedChunkMetadata.add(new AlignedChunkMetadata(currentTimeChunk,
currentValueChunk));
+ currentTimeChunk = iChunkMetadataList.get(i);
+ currentValueChunk = new ArrayList<>();
+ } else {
+ currentValueChunk.add(iChunkMetadataList.get(i));
+ }
+ }
+ if (currentValueChunk.size() > 0) {
alignedChunkMetadata.add(new AlignedChunkMetadata(currentTimeChunk,
currentValueChunk));
- currentTimeChunk = iChunkMetadataList.get(i);
- currentValueChunk = new ArrayList<>();
- } else {
- currentValueChunk.add(iChunkMetadataList.get(i));
}
+ writeAlignedChunkMetadata(alignedChunkMetadata, seriesPath, output);
+ } else {
+ writeNormalChunkMetadata(iChunkMetadataList, seriesPath, output);
}
- if (currentValueChunk.size() > 0) {
- alignedChunkMetadata.add(new AlignedChunkMetadata(currentTimeChunk,
currentValueChunk));
- }
- for (IChunkMetadata chunkMetadata : alignedChunkMetadata) {
+ }
+
+ private void writeAlignedChunkMetadata(
+ List<IChunkMetadata> iChunkMetadataList, Path seriesPath,
LocalTsFileOutput output)
+ throws IOException {
+ ReadWriteIOUtils.write(VECTOR_TYPE, output);
+ for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
PublicBAOS buffer = new PublicBAOS();
int size = chunkMetadata.serializeWithFullInfo(buffer,
seriesPath.getDevice());
- ReadWriteIOUtils.write(size, tempOutput);
- buffer.writeTo(tempOutput);
+ ReadWriteIOUtils.write(size, output);
+ buffer.writeTo(output);
}
}
- private void writeNormalChunkMetadata(List<IChunkMetadata>
iChunkMetadataList, Path seriesPath)
+ private void writeNormalChunkMetadata(
+ List<IChunkMetadata> iChunkMetadataList, Path seriesPath,
LocalTsFileOutput output)
throws IOException {
- ReadWriteIOUtils.write(NORMAL_TYPE, tempOutput);
+ ReadWriteIOUtils.write(NORMAL_TYPE, output);
for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
PublicBAOS buffer = new PublicBAOS();
int size = chunkMetadata.serializeWithFullInfo(buffer,
seriesPath.getFullPath());
- ReadWriteIOUtils.write(size, tempOutput);
- buffer.writeTo(tempOutput);
+ ReadWriteIOUtils.write(size, output);
+ buffer.writeTo(output);
}
}
- protected Pair<String, IChunkMetadata> readNextChunkMetadata() throws
IOException {
- if (tempInput == null) {
- tempInput = new LocalTsFileInput(chunkMetadataTempFile.toPath());
- }
- byte type = readNextChunkMetadataType();
- int size = readNextChunkMetadataSize();
- ByteBuffer chunkBuffer = ByteBuffer.allocate(size);
- ReadWriteIOUtils.readAsPossible(tempInput, chunkBuffer);
- chunkBuffer.flip();
- if (type == NORMAL_TYPE) {
- ChunkMetadata chunkMetadata = new ChunkMetadata();
- String seriesPath = ChunkMetadata.deserializeWithFullInfo(chunkBuffer,
chunkMetadata);
- return new Pair<>(seriesPath, chunkMetadata);
+ @Override
+ public void endFile() throws IOException {
+ if (this.sortedSegmentPosition.size() > 0) {
+ // there is some chunk metadata already been written to the disk
+ sortAndFlushChunkMetadata();
+ tempOutput.close();
} else {
- AlignedChunkMetadata chunkMetadata = new AlignedChunkMetadata();
- String devicePath =
AlignedChunkMetadata.deserializeWithFullInfo(chunkBuffer, chunkMetadata);
- return new Pair<>(devicePath, chunkMetadata);
+ // sort the chunk metadata in memory, and just close the file
+ tempOutput.close();
+ super.endFile();
+ return;
}
- }
- private byte readNextChunkMetadataType() throws IOException {
- typeBuffer.clear();
- ReadWriteIOUtils.readAsPossible(tempInput, typeBuffer);
- typeBuffer.flip();
- return ReadWriteIOUtils.readByte(typeBuffer);
- }
+ if (needSort) {
+ externalSort();
+ }
- private int readNextChunkMetadataSize() throws IOException {
- sizeBuffer.clear();
- ReadWriteIOUtils.readAsPossible(tempInput, sizeBuffer);
- sizeBuffer.flip();
- return ReadWriteIOUtils.readInt(sizeBuffer);
+ // super.endFile();
}
- @Override
- public void endFile() {
- // super.endFile();
+ protected void externalSort() throws IOException {
+ ChunkMetadataComparator comparator = new ChunkMetadataComparator();
+ int totalSegmentCount = this.sortedSegmentPosition.size();
+ File currentInFile = this.chunkMetadataTempFile;
+ File currentOutFile = new File(this.file.getAbsolutePath() +
SORTING_TEMP_FILE);
+ LocalTsFileInput inputForWindow1 = null;
+ LocalTsFileInput inputForWindow2 = null;
+ LocalTsFileOutput output = null;
+ while (totalSegmentCount > 1) {
+ try {
+ inputForWindow1 = new LocalTsFileInput(currentInFile.toPath());
+ inputForWindow2 = new LocalTsFileInput(currentInFile.toPath());
+ output = new LocalTsFileOutput(new FileOutputStream(currentOutFile));
+ totalSegmentCount = 0;
+ Queue<Long> newSortedSegmentPosition = new ArrayDeque<>();
+ while (sortedSegmentPosition.size() > 0) {
+ long startPositionForWindow1 = sortedSegmentPosition.poll();
+ if (sortedSegmentPosition.size() == 0) {
+ // Just leave it alone, and record the position
+ newSortedSegmentPosition.add(startPositionForWindow1);
+ continue;
+ }
+ long startPositionForWindow2 = sortedSegmentPosition.poll();
+ ChunkMetadataExternalSortWindow firstWindow =
+ new ChunkMetadataExternalSortWindow(
+ startPositionForWindow1, startPositionForWindow2,
inputForWindow1);
+ ChunkMetadataExternalSortWindow secondWindow =
+ new ChunkMetadataExternalSortWindow(
+ startPositionForWindow2,
+ sortedSegmentPosition.size() > 0
+ ? sortedSegmentPosition.element()
+ : this.chunkMetadataTempFile.length(),
+ inputForWindow2);
+ firstWindow.getNextSeriesNameAndChunkMetadata();
+ secondWindow.getNextSeriesNameAndChunkMetadata();
+ newSortedSegmentPosition.add(output.getPosition());
+ while (firstWindow.hasNextChunkMetadata() &&
secondWindow.hasNextChunkMetadata()) {
+ Pair<String, IChunkMetadata> pairOfFirstWindow =
+ firstWindow.getCurrentSeriesNameAndChunkMetadata();
+ Pair<String, IChunkMetadata> pairOfSecondWindow =
+ secondWindow.getCurrentSeriesNameAndChunkMetadata();
+ Pair<String, IChunkMetadata> pairToWritten = null;
+ if (comparator.compare(pairOfFirstWindow, pairOfSecondWindow) < 0)
{
+ pairToWritten = pairOfFirstWindow;
+ if (firstWindow.hasNextChunkMetadata()) {
+ firstWindow.getNextSeriesNameAndChunkMetadata();
+ }
+ } else {
+ pairToWritten = pairOfSecondWindow;
+ if (secondWindow.hasNextChunkMetadata()) {
+ secondWindow.getNextSeriesNameAndChunkMetadata();
+ }
+ }
+ // serialize the chunk to the output
+ if (pairToWritten.right instanceof AlignedChunkMetadata) {
+ writeAlignedChunkMetadata(
+ Collections.singletonList(pairToWritten.right),
+ new Path(pairToWritten.left),
+ output);
+ } else {
+ writeNormalChunkMetadata(
+ Collections.singletonList(pairToWritten.right),
+ new Path(pairToWritten.left),
+ output);
+ }
+ }
+ }
+
+ output.close();
+ inputForWindow1.close();
+ inputForWindow2.close();
+ FileUtils.delete(currentInFile);
+ currentOutFile.renameTo(currentInFile);
+ File tempFile = currentOutFile;
+ currentOutFile = currentInFile;
+ currentInFile = tempFile;
+ } finally {
+ if (inputForWindow1 != null) {
+ inputForWindow1.close();
+ }
+ if (inputForWindow2 != null) {
+ inputForWindow2.close();
+ }
+ if (output != null) {
+ output.close();
+ }
+ }
+ }
}
@Override
public void close() throws IOException {
super.close();
- if (tempInput != null) {
- tempInput.close();
- }
if (tempOutput != null) {
this.tempOutput.close();
}
}
+
+ protected static class ChunkMetadataComparator
+ implements Comparator<Pair<String, IChunkMetadata>> {
+
+ @Override
+ public int compare(Pair<String, IChunkMetadata> o1, Pair<String,
IChunkMetadata> o2) {
+ String seriesNameOfO1 = o1.left;
+ String seriesNameOfO2 = o2.left;
+ int lexicographicalOrder = seriesNameOfO1.compareTo(seriesNameOfO2);
+ if (lexicographicalOrder != 0) {
+ return lexicographicalOrder;
+ } else {
+ return Long.compare(o1.right.getStartTime(), o2.right.getStartTime());
+ }
+ }
+ }
+
+ protected class ChunkMetadataExternalSortWindow {
+
+ final LocalTsFileInput input;
+ final long startPosition;
+ final long endPosition;
+ final ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
+ final ByteBuffer typeBuffer = ByteBuffer.allocate(1);
+ Pair<String, IChunkMetadata> currentPair = null;
+
+ ChunkMetadataExternalSortWindow(long startPosition, long endPosition,
LocalTsFileInput input)
+ throws IOException {
+ this.startPosition = startPosition;
+ this.endPosition = endPosition;
+ this.input = input;
+ this.input.position(startPosition);
+ }
+
+ public boolean hasNextChunkMetadata() throws IOException {
+ return currentPair != null || this.input.position() < endPosition;
+ }
+
+ public Pair<String, IChunkMetadata> getNextSeriesNameAndChunkMetadata()
throws IOException {
+ byte type = readNextChunkMetadataType();
+ int size = readNextChunkMetadataSize();
+ ByteBuffer chunkBuffer = ByteBuffer.allocate(size);
+ ReadWriteIOUtils.readAsPossible(input, chunkBuffer);
+ chunkBuffer.flip();
+ if (type == NORMAL_TYPE) {
+ ChunkMetadata chunkMetadata = new ChunkMetadata();
+ String seriesPath = ChunkMetadata.deserializeWithFullInfo(chunkBuffer,
chunkMetadata);
+ currentPair = new Pair<>(seriesPath, chunkMetadata);
+ } else {
+ AlignedChunkMetadata chunkMetadata = new AlignedChunkMetadata();
+ String devicePath =
+ AlignedChunkMetadata.deserializeWithFullInfo(chunkBuffer,
chunkMetadata);
+ currentPair = new Pair<>(devicePath, chunkMetadata);
+ }
+ return currentPair;
+ }
+
+ public Pair<String, IChunkMetadata> getCurrentSeriesNameAndChunkMetadata()
{
+ return currentPair;
+ }
+
+ private byte readNextChunkMetadataType() throws IOException {
+ typeBuffer.clear();
+ ReadWriteIOUtils.readAsPossible(input, typeBuffer);
+ typeBuffer.flip();
+ return ReadWriteIOUtils.readByte(typeBuffer);
+ }
+
+ private int readNextChunkMetadataSize() throws IOException {
+ sizeBuffer.clear();
+ ReadWriteIOUtils.readAsPossible(input, sizeBuffer);
+ sizeBuffer.flip();
+ return ReadWriteIOUtils.readInt(sizeBuffer);
+ }
+ }
}
diff --git
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriterTest.java
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriterTest.java
index e894901a0a..25bd192974 100644
---
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriterTest.java
+++
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriterTest.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -100,8 +101,14 @@ public class MemoryControlTsFileIOWriterTest extends
MemoryControlTsFileIOWriter
writer.sortAndFlushChunkMetadata();
writer.tempOutput.flush();
+ ChunkMetadataExternalSortWindow window =
+ writer
+ .new ChunkMetadataExternalSortWindow(
+ 0,
+ writer.chunkMetadataTempFile.length(),
+ new LocalTsFileInput(writer.chunkMetadataTempFile.toPath()));
for (int i = 0; i < originChunkMetadataList.size(); ++i) {
- Pair<String, IChunkMetadata> chunkMetadataPair =
writer.readNextChunkMetadata();
+ Pair<String, IChunkMetadata> chunkMetadataPair =
window.getNextSeriesNameAndChunkMetadata();
Assert.assertEquals("root.sg.d" + i / 5 + ".s" + i % 5,
chunkMetadataPair.left);
Assert.assertEquals(
originChunkMetadataList.get(i).getStartTime(),
chunkMetadataPair.right.getStartTime());
@@ -150,8 +157,14 @@ public class MemoryControlTsFileIOWriterTest extends
MemoryControlTsFileIOWriter
new AlignedChunkMetadata(currentTimeChunkMetadata,
currentValueChunkMetadata));
}
+ ChunkMetadataExternalSortWindow window =
+ writer
+ .new ChunkMetadataExternalSortWindow(
+ 0,
+ writer.chunkMetadataTempFile.length(),
+ new LocalTsFileInput(writer.chunkMetadataTempFile.toPath()));
for (int i = 0; i < alignedChunkMetadata.size(); ++i) {
- Pair<String, IChunkMetadata> chunkMetadataPair =
writer.readNextChunkMetadata();
+ Pair<String, IChunkMetadata> chunkMetadataPair =
window.getNextSeriesNameAndChunkMetadata();
Assert.assertEquals("root.sg.d" + i, chunkMetadataPair.left);
Assert.assertEquals(
alignedChunkMetadata.get(i).getStartTime(),
chunkMetadataPair.right.getStartTime());
@@ -213,8 +226,14 @@ public class MemoryControlTsFileIOWriterTest extends
MemoryControlTsFileIOWriter
writer.sortAndFlushChunkMetadata();
writer.tempOutput.flush();
+ ChunkMetadataExternalSortWindow window =
+ writer
+ .new ChunkMetadataExternalSortWindow(
+ 0,
+ writer.chunkMetadataTempFile.length(),
+ new LocalTsFileInput(writer.chunkMetadataTempFile.toPath()));
for (int i = 0, deviceCnt = 0; i < originChunkMetadataList.size(); ++i) {
- Pair<String, IChunkMetadata> chunkMetadataPair =
writer.readNextChunkMetadata();
+ Pair<String, IChunkMetadata> chunkMetadataPair =
window.getNextSeriesNameAndChunkMetadata();
if (originChunkMetadataList.get(i) instanceof ChunkMetadata) {
Assert.assertEquals(
"root.sg.d" + deviceCnt + "." +
originChunkMetadataList.get(i).getMeasurementUid(),