This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch TYQuery in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e176396c6cff0731636965a4a4c8de8150bbd9ed Author: JackieTien97 <[email protected]> AuthorDate: Wed Mar 10 20:25:25 2021 +0800 init --- .../iotdb/db/engine/flush/MemTableFlushTask.java | 12 +- .../db/tools/upgrade/TsFileOnlineUpgradeTool.java | 12 +- .../java/org/apache/iotdb/db/utils/MergeUtils.java | 24 +- .../iotdb/tsfile/file/header/ChunkHeader.java | 13 +- .../file/metadata/statistics/Statistics.java | 94 ++----- .../file/metadata/statistics/TimeStatistics.java | 164 ++++++++++++ .../org/apache/iotdb/tsfile/utils/PublicBAOS.java | 4 + .../apache/iotdb/tsfile/utils/TsPrimitiveType.java | 2 + .../iotdb/tsfile/write/chunk/ChunkWriterImpl.java | 34 +-- .../iotdb/tsfile/write/chunk/IChunkWriter.java | 18 +- .../iotdb/tsfile/write/chunk/TimeChunkWriter.java | 269 +++++++++++++++++++ .../iotdb/tsfile/write/chunk/ValueChunkWriter.java | 264 +++++++++++++++++++ .../tsfile/write/chunk/VectorChunkWriterImpl.java | 199 ++++++++++++++ .../apache/iotdb/tsfile/write/page/PageWriter.java | 1 + .../iotdb/tsfile/write/page/TimePageWriter.java | 172 ++++++++++++ .../page/{PageWriter.java => ValuePageWriter.java} | 169 ++++++------ .../write/record/datapoint/BooleanDataPoint.java | 2 +- .../write/record/datapoint/DoubleDataPoint.java | 2 +- .../write/record/datapoint/FloatDataPoint.java | 2 +- .../write/record/datapoint/IntDataPoint.java | 2 +- .../write/record/datapoint/LongDataPoint.java | 2 +- .../write/record/datapoint/StringDataPoint.java | 2 +- .../tsfile/write/schema/IMeasurementSchema.java | 46 ++++ .../iotdb/tsfile/write/writer/TsFileIOWriter.java | 72 +++-- .../iotdb/tsfile/write/TsFileIOWriterTest.java | 3 +- .../write/writer/RestorableTsFileIOWriterTest.java | 27 +- .../tsfile/write/writer/TestTsFileOutput.java | 52 ++++ .../tsfile/write/writer/TimeChunkWriterTest.java | 108 ++++++++ .../tsfile/write/writer/TimePageWriterTest.java | 172 ++++++++++++ .../tsfile/write/writer/ValueChunkWriterTest.java | 105 ++++++++ .../tsfile/write/writer/ValuePageWriterTest.java | 290 +++++++++++++++++++++ .../write/writer/VectorChunkWriterImplTest.java | 176 +++++++++++++ .../write/writer/VectorMeasurementSchemaStub.java | 78 ++++++ 33 files changed, 2345 insertions(+), 247 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java index 9397af1..caa0893 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java @@ -173,22 +173,22 @@ public class MemTableFlushTask { switch (dataType) { case BOOLEAN: - seriesWriterImpl.write(time, tvPairs.getBoolean(i)); + seriesWriterImpl.write(time, tvPairs.getBoolean(i), false); break; case INT32: - seriesWriterImpl.write(time, tvPairs.getInt(i)); + seriesWriterImpl.write(time, tvPairs.getInt(i), false); break; case INT64: - seriesWriterImpl.write(time, tvPairs.getLong(i)); + seriesWriterImpl.write(time, tvPairs.getLong(i), false); break; case FLOAT: - seriesWriterImpl.write(time, tvPairs.getFloat(i)); + seriesWriterImpl.write(time, tvPairs.getFloat(i), false); break; case DOUBLE: - seriesWriterImpl.write(time, tvPairs.getDouble(i)); + seriesWriterImpl.write(time, tvPairs.getDouble(i), false); break; case TEXT: - seriesWriterImpl.write(time, tvPairs.getBinary(i)); + seriesWriterImpl.write(time, tvPairs.getBinary(i), false); break; default: LOGGER.error( diff --git a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java index b7e0842..b6e2316 100644 --- a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java +++ b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java @@ -412,22 +412,22 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable { getOrDefaultTsFileIOWriter(oldTsFile, partitionId); switch (schema.getType()) { case INT32: - chunkWriter.write(time, (int) value); + chunkWriter.write(time, (int) value, false); break; case INT64: - chunkWriter.write(time, (long) value); + chunkWriter.write(time, (long) value, false); break; case FLOAT: - chunkWriter.write(time, (float) value); + chunkWriter.write(time, (float) value, false); break; case DOUBLE: - chunkWriter.write(time, (double) value); + chunkWriter.write(time, (double) value, false); break; case BOOLEAN: - chunkWriter.write(time, (boolean) value); + chunkWriter.write(time, (boolean) value, false); break; case TEXT: - chunkWriter.write(time, (Binary) value); + chunkWriter.write(time, (Binary) value, false); break; default: throw new UnSupportedDataTypeException( diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java index 973cb24..c3f39b7 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java @@ -53,22 +53,22 @@ public class MergeUtils { public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter chunkWriter) { switch (chunkWriter.getDataType()) { case TEXT: - chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary()); + chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary(), false); break; case DOUBLE: - chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble()); + chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble(), false); break; case BOOLEAN: - chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean()); + chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean(), false); break; case INT64: - chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong()); + chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong(), false); break; case INT32: - chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt()); + chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt(), false); break; case FLOAT: - chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat()); + chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat(), false); break; default: throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType()); @@ -109,22 +109,22 @@ public class MergeUtils { public static void writeBatchPoint(BatchData batchData, int i, IChunkWriter chunkWriter) { switch (chunkWriter.getDataType()) { case TEXT: - chunkWriter.write(batchData.getTimeByIndex(i), batchData.getBinaryByIndex(i)); + chunkWriter.write(batchData.getTimeByIndex(i), batchData.getBinaryByIndex(i), false); break; case DOUBLE: - chunkWriter.write(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i)); + chunkWriter.write(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), false); break; case BOOLEAN: - chunkWriter.write(batchData.getTimeByIndex(i), batchData.getBooleanByIndex(i)); + chunkWriter.write(batchData.getTimeByIndex(i), batchData.getBooleanByIndex(i), false); break; case INT64: - chunkWriter.write(batchData.getTimeByIndex(i), batchData.getLongByIndex(i)); + chunkWriter.write(batchData.getTimeByIndex(i), batchData.getLongByIndex(i), false); break; case INT32: - chunkWriter.write(batchData.getTimeByIndex(i), batchData.getIntByIndex(i)); + chunkWriter.write(batchData.getTimeByIndex(i), batchData.getIntByIndex(i), false); break; case FLOAT: - chunkWriter.write(batchData.getTimeByIndex(i), batchData.getFloatByIndex(i)); + chunkWriter.write(batchData.getTimeByIndex(i), batchData.getFloatByIndex(i), false); break; default: throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType()); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java index 663da60..26d3ca7 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java @@ -58,8 +58,19 @@ public class ChunkHeader { CompressionType compressionType, TSEncoding encoding, int numOfPages) { + this(measurementID, dataSize, dataType, compressionType, encoding, numOfPages, 0); + } + + public ChunkHeader( + String measurementID, + int dataSize, + TSDataType dataType, + CompressionType compressionType, + TSEncoding encoding, + int numOfPages, + int mask) { this( - numOfPages <= 1 ? MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER : MetaMarker.CHUNK_HEADER, + (byte) ((numOfPages <= 1 ? MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER : MetaMarker.CHUNK_HEADER) | mask), measurementID, dataSize, getSerializedSize(measurementID, dataSize), diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java index 1ec2001..1c1ba7f 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java @@ -77,6 +77,8 @@ public abstract class Statistics<T> { return new DoubleStatistics(); case FLOAT: return new FloatStatistics(); + case Vector: + return new TimeStatistics(); default: throw new UnknownColumnTypeException(type.toString()); } @@ -189,61 +191,36 @@ public abstract class Statistics<T> { } public void update(long time, boolean value) { - if (time < this.startTime) { - startTime = time; - } - if (time > this.endTime) { - endTime = time; - } - count++; + update(time); updateStats(value); } public void update(long time, int value) { - if (time < this.startTime) { - startTime = time; - } - if (time > this.endTime) { - endTime = time; - } - count++; + update(time); updateStats(value); } public void update(long time, long value) { - if (time < this.startTime) { - startTime = time; - } - if (time > this.endTime) { - endTime = time; - } - count++; + update(time); updateStats(value); } public void update(long time, float value) { - if (time < this.startTime) { - startTime = time; - } - if (time > this.endTime) { - endTime = time; - } - count++; + update(time); updateStats(value); } public void update(long time, double value) { - if (time < this.startTime) { - startTime = time; - } - if (time > this.endTime) { - endTime = time; - } - count++; + update(time); updateStats(value); } public void update(long time, Binary value) { + update(time); + updateStats(value); + } + + public void update(long time) { if (time < startTime) { startTime = time; } @@ -251,65 +228,39 @@ public abstract class Statistics<T> { endTime = time; } count++; - updateStats(value); } public void update(long[] time, boolean[] values, int batchSize) { - if (time[0] < startTime) { - startTime = time[0]; - } - if (time[batchSize - 1] > this.endTime) { - endTime = time[batchSize - 1]; - } - count += batchSize; + update(time, batchSize); updateStats(values, batchSize); } public void update(long[] time, int[] values, int batchSize) { - if (time[0] < startTime) { - startTime = time[0]; - } - if (time[batchSize - 1] > this.endTime) { - endTime = time[batchSize - 1]; - } - count += batchSize; + update(time, batchSize); updateStats(values, batchSize); } public void update(long[] time, long[] values, int batchSize) { - if (time[0] < startTime) { - startTime = time[0]; - } - if (time[batchSize - 1] > this.endTime) { - endTime = time[batchSize - 1]; - } - count += batchSize; + update(time, batchSize); updateStats(values, batchSize); } public void update(long[] time, float[] values, int batchSize) { - if (time[0] < startTime) { - startTime = time[0]; - } - if (time[batchSize - 1] > this.endTime) { - endTime = time[batchSize - 1]; - } - count += batchSize; + update(time, batchSize); updateStats(values, batchSize); } public void update(long[] time, double[] values, int batchSize) { - if (time[0] < startTime) { - startTime = time[0]; - } - if (time[batchSize - 1] > this.endTime) { - endTime = time[batchSize - 1]; - } - count += batchSize; + update(time, batchSize); updateStats(values, batchSize); } public void update(long[] time, Binary[] values, int batchSize) { + update(time, batchSize); + updateStats(values, batchSize); + } + + public void update(long[] time, int batchSize) { if (time[0] < startTime) { startTime = time[0]; } @@ -317,7 +268,6 @@ public abstract class Statistics<T> { endTime = time[batchSize - 1]; } count += batchSize; - updateStats(values, batchSize); } protected abstract void mergeStatisticsValue(Statistics stats); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeStatistics.java new file mode 100644 index 0000000..ba29554 --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeStatistics.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.file.metadata.statistics; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + +public class TimeStatistics extends Statistics { + + static final int TIME_STATISTICS_FIXED_RAM_SIZE = 40; + + @Override + public TSDataType getType() { + return TSDataType.Vector; + } + + @Override + public int getStatsSize() { + return 0; + } + + @Override + public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes) { + throw new StatisticsClassException("Time statistics does not support: set min max from bytes"); + } + + @Override + public Long getMinValue() { + throw new StatisticsClassException("Time statistics does not support: min value"); + } + + @Override + public Long getMaxValue() { + throw new StatisticsClassException("Time statistics does not support: max value"); + } + + @Override + public Long getFirstValue() { + throw new StatisticsClassException("Time statistics does not support: first value"); + } + + @Override + public Long getLastValue() { + throw new StatisticsClassException("Time statistics does not support: last value"); + } + + @Override + public double getSumDoubleValue() { + throw new StatisticsClassException("Time statistics does not support: double sum"); + } + + @Override + public long getSumLongValue() { + throw new StatisticsClassException("Time statistics does not support: long sum"); + } + + @Override + void updateStats(long value) { + throw new StatisticsClassException("Time statistics does not support: update stats"); + } + + @Override + void updateStats(long[] values, int batchSize) { + throw new StatisticsClassException("Time statistics does not support: update stats"); + } + + @Override + public void updateStats(long minValue, long maxValue) { + throw new StatisticsClassException("Time statistics does not support: update stats"); + } + + @Override + public long calculateRamSize() { + return TIME_STATISTICS_FIXED_RAM_SIZE; + } + + @Override + protected void mergeStatisticsValue(Statistics stats) { + + } + + @Override + public byte[] getMinValueBytes() { + throw new StatisticsClassException("Time statistics does not support: get min value bytes"); + } + + @Override + public byte[] getMaxValueBytes() { + throw new StatisticsClassException("Time statistics does not support: get max value bytes"); + } + + @Override + public byte[] getFirstValueBytes() { + throw new StatisticsClassException("Time statistics does not support: get first value bytes"); + } + + @Override + public byte[] getLastValueBytes() { + throw new StatisticsClassException("Time statistics does not support: get last value bytes"); + } + + @Override + public byte[] getSumValueBytes() { + throw new StatisticsClassException("Time statistics does not support: get sum value bytes"); + } + + @Override + public ByteBuffer getMinValueBuffer() { + throw new StatisticsClassException("Time statistics does not support: get min value bytes"); + } + + @Override + public ByteBuffer getMaxValueBuffer() { + throw new StatisticsClassException("Time statistics does not support: get max value buffer"); + } + + @Override + public ByteBuffer getFirstValueBuffer() { + throw new StatisticsClassException("Time statistics does not support: get first value buffer"); + } + + @Override + public ByteBuffer getLastValueBuffer() { + throw new StatisticsClassException("Time statistics does not support: get last value buffer"); + } + + @Override + public ByteBuffer getSumValueBuffer() { + throw new StatisticsClassException("Time statistics does not support: get sum value buffer"); + } + + @Override + public int serializeStats(OutputStream outputStream) { + return 0; + } + + @Override + public void deserialize(InputStream inputStream) throws IOException { + } + + @Override + public void deserialize(ByteBuffer byteBuffer) { + } +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java index 881db03..61efe8e 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java @@ -86,4 +86,8 @@ public class PublicBAOS extends ByteArrayOutputStream { public int size() { return count; } + + public void truncate(int size) { + count = size; + } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java index e5ffb4e..2dce654 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java @@ -45,6 +45,8 @@ public abstract class TsPrimitiveType implements Serializable { return new TsPrimitiveType.TsDouble((double) v); case TEXT: return new TsPrimitiveType.TsBinary((Binary) v); + case Vector: + return new TsPrimitiveType.TsVector((TsPrimitiveType[]) v); default: throw new UnSupportedDataTypeException("Unsupported data type:" + dataType); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java index 2c00f4a..0ae34ef 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java @@ -18,6 +18,10 @@ */ package org.apache.iotdb.tsfile.write.chunk; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.compress.ICompressor; import org.apache.iotdb.tsfile.encoding.encoder.SDTEncoder; @@ -32,15 +36,9 @@ import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; import org.apache.iotdb.tsfile.write.page.PageWriter; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; - public class ChunkWriterImpl implements IChunkWriter { private static final Logger logger = LoggerFactory.getLogger(ChunkWriterImpl.class); @@ -145,7 +143,7 @@ public class ChunkWriterImpl implements IChunkWriter { } @Override - public void write(long time, long value) { + public void write(long time, long value, boolean isNull) { // store last point for sdtEncoding, it still needs to go through encoding process // in case it exceeds compdev and needs to store second last point if (!isSdtEncoding || sdtEncoder.encodeLong(time, value)) { @@ -160,7 +158,7 @@ public class ChunkWriterImpl implements IChunkWriter { } @Override - public void write(long time, int value) { + public void write(long time, int value, boolean isNull) { if (!isSdtEncoding || sdtEncoder.encodeInt(time, value)) { pageWriter.write( isSdtEncoding ? sdtEncoder.getTime() : time, @@ -173,13 +171,13 @@ public class ChunkWriterImpl implements IChunkWriter { } @Override - public void write(long time, boolean value) { + public void write(long time, boolean value, boolean isNull) { pageWriter.write(time, value); checkPageSizeAndMayOpenANewPage(); } @Override - public void write(long time, float value) { + public void write(long time, float value, boolean isNull) { if (!isSdtEncoding || sdtEncoder.encodeFloat(time, value)) { pageWriter.write( isSdtEncoding ? sdtEncoder.getTime() : time, @@ -193,7 +191,7 @@ public class ChunkWriterImpl implements IChunkWriter { } @Override - public void write(long time, double value) { + public void write(long time, double value, boolean isNull) { if (!isSdtEncoding || sdtEncoder.encodeDouble(time, value)) { pageWriter.write( isSdtEncoding ? sdtEncoder.getTime() : time, @@ -206,12 +204,17 @@ public class ChunkWriterImpl implements IChunkWriter { } @Override - public void write(long time, Binary value) { + public void write(long time, Binary value, boolean isNull) { pageWriter.write(time, value); checkPageSizeAndMayOpenANewPage(); } @Override + public void write(long time) { + throw new IllegalStateException("write time method is not implemented in common chunk writer"); + } + + @Override public void write(long[] timestamps, int[] values, int batchSize) { if (isSdtEncoding) { batchSize = sdtEncoder.encode(timestamps, values, batchSize); @@ -412,7 +415,7 @@ public class ChunkWriterImpl implements IChunkWriter { * @param statistics the chunk statistics * @throws IOException exception in IO */ - public void writeAllPagesOfChunkToTsFile(TsFileIOWriter writer, Statistics<?> statistics) + private void writeAllPagesOfChunkToTsFile(TsFileIOWriter writer, Statistics<?> statistics) throws IOException { if (statistics.getCount() == 0) { return; @@ -420,13 +423,14 @@ public class ChunkWriterImpl implements IChunkWriter { // start to write this column chunk writer.startFlushChunk( - measurementSchema, + measurementSchema.getMeasurementId(), compressor.getType(), measurementSchema.getType(), measurementSchema.getEncodingType(), statistics, pageBuffer.size(), - numOfPages); + numOfPages, + 0); long dataOffset = writer.getPos(); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java index cab9615..70752ec 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java @@ -18,32 +18,34 @@ */ package org.apache.iotdb.tsfile.write.chunk; +import java.io.IOException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; -import java.io.IOException; - /** IChunkWriter provides a list of writing methods for different value types. */ public interface IChunkWriter { /** write a time value pair. */ - void write(long time, int value); + void write(long time, int value, boolean isNull); /** write a time value pair. */ - void write(long time, long value); + void write(long time, long value, boolean isNull); /** write a time value pair. */ - void write(long time, boolean value); + void write(long time, boolean value, boolean isNull); /** write a time value pair. */ - void write(long time, float value); + void write(long time, float value, boolean isNull); /** write a time value pair. */ - void write(long time, double value); + void write(long time, double value, boolean isNull); /** write a time value pair. */ - void write(long time, Binary value); + void write(long time, Binary value, boolean isNull); + + /** write a time. */ + void write(long time); /** write time series */ void write(long[] timestamps, int[] values, int batchSize); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java new file mode 100644 index 0000000..87c051e --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.write.chunk; + +import java.io.IOException; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; +import org.apache.iotdb.tsfile.compress.ICompressor; +import org.apache.iotdb.tsfile.encoding.encoder.Encoder; +import org.apache.iotdb.tsfile.file.header.ChunkHeader; +import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics; +import org.apache.iotdb.tsfile.utils.PublicBAOS; +import org.apache.iotdb.tsfile.write.page.TimePageWriter; +import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TimeChunkWriter { + + private static final Logger logger = LoggerFactory.getLogger(TimeChunkWriter.class); + + private final String measurementId; + + private final TSEncoding encodingType; + + private final CompressionType compressionType; + + /** + * all pages of this chunk. + */ + private final PublicBAOS pageBuffer; + + private int numOfPages; + + /** + * write data into current page + */ + private TimePageWriter pageWriter; + + /** + * page size threshold. + */ + private final long pageSizeThreshold; + + private final int maxNumberOfPointsInPage; + + /** + * value count in current page. + */ + private int valueCountInOnePageForNextCheck; + + // initial value for valueCountInOnePageForNextCheck + private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 1500; + + /** + * statistic of this chunk. + */ + private TimeStatistics statistics; + + /** + * first page info + */ + private int sizeWithoutStatistic; + + private Statistics<?> firstPageStatistics; + + public TimeChunkWriter(String measurementId, CompressionType compressionType, + TSEncoding encodingType, Encoder timeEncoder) { + this.measurementId = measurementId; + this.encodingType = encodingType; + this.compressionType = compressionType; + this.pageBuffer = new PublicBAOS(); + + this.pageSizeThreshold = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(); + this.maxNumberOfPointsInPage = + TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage(); + // initial check of memory usage. So that we have enough data to make an initial prediction + this.valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK; + + // init statistics for this chunk and page + this.statistics = new TimeStatistics(); + + this.pageWriter = new TimePageWriter(timeEncoder, ICompressor.getCompressor(compressionType)); + + } + + public void write(long time) { + pageWriter.write(time); + } + + public void write(long[] timestamps, int batchSize) { + pageWriter.write(timestamps, batchSize); + } + + /** + * check occupied memory size, if it exceeds the PageSize threshold, construct a page and put it + * to pageBuffer + */ + public boolean checkPageSizeAndMayOpenANewPage() { + if (pageWriter.getPointNumber() == maxNumberOfPointsInPage) { + logger.debug("current line count reaches the upper bound, write page {}", measurementId); + return true; + } else if (pageWriter.getPointNumber() + >= valueCountInOnePageForNextCheck) { // need to check memory size + // not checking the memory used for every value + long currentPageSize = pageWriter.estimateMaxMemSize(); + if (currentPageSize > pageSizeThreshold) { // memory size exceeds threshold + // we will write the current page + logger.debug( + "enough size, write page {}, pageSizeThreshold:{}, currentPateSize:{}, valueCountInOnePage:{}", + measurementId, + pageSizeThreshold, + currentPageSize, + pageWriter.getPointNumber()); + valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK; + return true; + } else { + // reset the valueCountInOnePageForNextCheck for the next page + valueCountInOnePageForNextCheck = + (int) (((float) pageSizeThreshold / currentPageSize) * pageWriter.getPointNumber()); + } + } + return false; + } + + public void writePageToPageBuffer() { + try { + if (numOfPages == 0) { // record the firstPageStatistics + this.firstPageStatistics = pageWriter.getStatistics(); + this.sizeWithoutStatistic = pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, true); + } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer + byte[] b = pageBuffer.toByteArray(); + pageBuffer.reset(); + pageBuffer.write(b, 0, this.sizeWithoutStatistic); + firstPageStatistics.serialize(pageBuffer); + pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic); + pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false); + firstPageStatistics = null; + } else { + pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false); + } + + // update statistics of this chunk + numOfPages++; + this.statistics.mergeStatistics(pageWriter.getStatistics()); + } catch (IOException e) { + logger.error("meet error in pageWriter.writePageHeaderAndDataIntoBuff,ignore this page:", e); + } finally { + // clear start time stamp for next initializing + pageWriter.reset(); + } + } + + public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException { + sealCurrentPage(); + writeAllPagesOfChunkToTsFile(tsfileWriter); + + // reinit this chunk writer + pageBuffer.reset(); + numOfPages = 0; + firstPageStatistics = null; + this.statistics = new TimeStatistics(); + } + + + public long estimateMaxSeriesMemSize() { + return pageBuffer.size() + + pageWriter.estimateMaxMemSize() + + PageHeader.estimateMaxPageHeaderSizeWithoutStatistics() + + pageWriter.getStatistics().getSerializedSize(); + } + + + public long getCurrentChunkSize() { + if (pageBuffer.size() == 0) { + return 0; + } + // return the serialized size of the chunk header + all pages + return ChunkHeader.getSerializedSize(measurementId, pageBuffer.size()) + + (long) pageBuffer.size(); + } + + + public void sealCurrentPage() { + if (pageWriter != null && pageWriter.getPointNumber() > 0) { + writePageToPageBuffer(); + } + } + + public void clearPageWriter() { + pageWriter = null; + } + + public int getNumOfPages() { + return numOfPages; + } + + public TSDataType getDataType() { + return TSDataType.Vector; + } + + /** + * write the page to specified IOWriter. + * + * @param writer the specified IOWriter + * @throws IOException exception in IO + */ + public void writeAllPagesOfChunkToTsFile(TsFileIOWriter writer) + throws IOException { + if (statistics.getCount() == 0) { + return; + } + + // start to write this column chunk + writer.startFlushChunk( + measurementId, + compressionType, + TSDataType.Vector, + encodingType, + statistics, + pageBuffer.size(), + numOfPages, + 0x80); + + long dataOffset = writer.getPos(); + + // write all pages of this column + writer.writeBytesToStream(pageBuffer); + + int dataSize = (int) (writer.getPos() - dataOffset); + if (dataSize != pageBuffer.size()) { + throw new IOException( + "Bytes written is inconsistent with the size of data: " + + dataSize + + " !=" + + " " + + pageBuffer.size()); + } + + writer.endCurrentChunk(); + } + + /** + * only used for test + */ + public PublicBAOS getPageBuffer() { + return pageBuffer; + } +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java new file mode 100644 index 0000000..865036f --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.write.chunk; + +import java.io.IOException; +import org.apache.iotdb.tsfile.compress.ICompressor; +import org.apache.iotdb.tsfile.encoding.encoder.Encoder; +import org.apache.iotdb.tsfile.file.header.ChunkHeader; +import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.utils.PublicBAOS; +import org.apache.iotdb.tsfile.write.page.ValuePageWriter; +import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ValueChunkWriter { + + private static final Logger logger = LoggerFactory.getLogger(ValueChunkWriter.class); + + private final String measurementId; + + private final TSEncoding encodingType; + + + private final TSDataType dataType; + + private final CompressionType compressionType; + + /** + * all pages of this chunk. + */ + private final PublicBAOS pageBuffer; + + private int numOfPages; + + /** + * write data into current page + */ + private ValuePageWriter pageWriter; + + + /** + * statistic of this chunk. + */ + private Statistics<?> statistics; + + /** + * first page info + */ + private int sizeWithoutStatistic; + + private Statistics<?> firstPageStatistics; + + public ValueChunkWriter(String measurementId, CompressionType compressionType, + TSDataType dataType, TSEncoding encodingType, Encoder valueEncoder) { + this.measurementId = measurementId; + this.encodingType = encodingType; + this.dataType = dataType; + this.compressionType = compressionType; + this.pageBuffer = new PublicBAOS(); + + // init statistics for this chunk and page + this.statistics = Statistics.getStatsByType(dataType); + + this.pageWriter = new ValuePageWriter(valueEncoder, ICompressor.getCompressor(compressionType), + dataType); + + } + + public void write(long time, long value, boolean isNull) { + pageWriter.write(time, value, isNull); + } + + public void write(long time, int value, boolean isNull) { + pageWriter.write(time, value, isNull); + } + + public void write(long time, boolean value, boolean isNull) { + pageWriter.write(time, value, isNull); + } + + public void write(long time, float value, boolean isNull) { + pageWriter.write(time, value, isNull); + } + + public void write(long time, double value, boolean isNull) { + pageWriter.write(time, value, isNull); + } + + public void write(long time, Binary value, boolean isNull) { + pageWriter.write(time, value, isNull); + } + + public void write(long[] timestamps, int[] values, int batchSize) { + pageWriter.write(timestamps, values, batchSize); + } + + public void write(long[] timestamps, long[] values, int batchSize) { + pageWriter.write(timestamps, values, batchSize); + } + + public void write(long[] timestamps, boolean[] values, int batchSize) { + pageWriter.write(timestamps, values, batchSize); + } + + public void write(long[] timestamps, float[] values, int batchSize) { + pageWriter.write(timestamps, values, batchSize); + } + + public void write(long[] timestamps, double[] values, int batchSize) { + pageWriter.write(timestamps, values, batchSize); + } + + public void write(long[] timestamps, Binary[] values, int batchSize) { + pageWriter.write(timestamps, values, batchSize); + } + + public void writePageToPageBuffer() { + try { + if (numOfPages == 0) { // record the firstPageStatistics + this.firstPageStatistics = pageWriter.getStatistics(); + this.sizeWithoutStatistic = pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, true); + } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer + byte[] b = pageBuffer.toByteArray(); + pageBuffer.reset(); + pageBuffer.write(b, 0, this.sizeWithoutStatistic); + firstPageStatistics.serialize(pageBuffer); + pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic); + pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false); + firstPageStatistics = null; + } else { + pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false); + } + + // update statistics of this chunk + numOfPages++; + this.statistics.mergeStatistics(pageWriter.getStatistics()); + } catch (IOException e) { + logger.error("meet error in pageWriter.writePageHeaderAndDataIntoBuff,ignore this page:", e); + } finally { + // clear start time stamp for next initializing + pageWriter.reset(dataType); + } + } + + + public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException { + sealCurrentPage(); + writeAllPagesOfChunkToTsFile(tsfileWriter); + + // reinit this chunk writer + pageBuffer.reset(); + numOfPages = 0; + firstPageStatistics = null; + this.statistics = Statistics.getStatsByType(dataType); + } + + public long estimateMaxSeriesMemSize() { + return pageBuffer.size() + + pageWriter.estimateMaxMemSize() + + PageHeader.estimateMaxPageHeaderSizeWithoutStatistics() + + pageWriter.getStatistics().getSerializedSize(); + } + + public long getCurrentChunkSize() { + if (pageBuffer.size() == 0) { + return 0; + } + // return the serialized size of the chunk header + all pages + return ChunkHeader.getSerializedSize(measurementId, pageBuffer.size()) + + (long) pageBuffer.size(); + } + + public void sealCurrentPage() { + // if the page contains no points, we still need to serialize it + if (pageWriter != null && pageWriter.getSize() != 0) { + writePageToPageBuffer(); + } + } + + public void clearPageWriter() { + pageWriter = null; + } + + + public int getNumOfPages() { + return numOfPages; + } + + + public TSDataType getDataType() { + return dataType; + } + + + /** + * write the page to specified IOWriter. + * + * @param writer the specified IOWriter + * @throws IOException exception in IO + */ + public void writeAllPagesOfChunkToTsFile(TsFileIOWriter writer) + throws IOException { + if (statistics.getCount() == 0) { + return; + } + + // start to write this column chunk + writer.startFlushChunk( + measurementId, + compressionType, + dataType, + encodingType, + statistics, + pageBuffer.size(), + numOfPages, + 0x40); + + long dataOffset = writer.getPos(); + + // write all pages of this column + writer.writeBytesToStream(pageBuffer); + + int dataSize = (int) (writer.getPos() - dataOffset); + if (dataSize != pageBuffer.size()) { + throw new IOException( + "Bytes written is inconsistent with the size of data: " + + dataSize + + " !=" + + " " + + pageBuffer.size()); + } + + writer.endCurrentChunk(); + } + + /** + * only used for test + */ + public PublicBAOS getPageBuffer() { + return pageBuffer; + } +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/VectorChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/VectorChunkWriterImpl.java new file mode 100644 index 0000000..eb4b0e1 --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/VectorChunkWriterImpl.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.write.chunk; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.iotdb.tsfile.encoding.encoder.Encoder; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; +import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; + +public class VectorChunkWriterImpl implements IChunkWriter { + + + private final TimeChunkWriter timeChunkWriter; + private final List<ValueChunkWriter> valueChunkWriterList; + private int valueIndex; + + /** + * @param schema schema of this measurement + */ + public VectorChunkWriterImpl(IMeasurementSchema schema) { + timeChunkWriter = new TimeChunkWriter(schema.getMeasurementId(), schema.getCompressor(), + schema.getTimeTSEncoding(), schema.getTimeEncoder()); + + List<String> valueMeasurementIdList = schema.getValueMeasurementIdList(); + List<TSDataType> valueTSDataTypeList = schema.getValueTSDataTypeList(); + List<TSEncoding> valueTSEncodingList = schema.getValueTSEncodingList(); + List<Encoder> valueEncoderList = schema.getValueEncoderList(); + + valueChunkWriterList = new ArrayList<>(valueMeasurementIdList.size()); + for (int i= 0; i < valueMeasurementIdList.size(); i++) { + valueChunkWriterList.add(new ValueChunkWriter(valueMeasurementIdList.get(i), + schema.getCompressor(), + valueTSDataTypeList.get(i), valueTSEncodingList.get(i), valueEncoderList.get(i))); + } + + this.valueIndex = 0; + } + + @Override + public void write(long time, int value, boolean isNull) { + valueChunkWriterList.get(valueIndex++).write(time, value, isNull); + } + + @Override + public void write(long time, long value, boolean isNull) { + valueChunkWriterList.get(valueIndex++).write(time, value, isNull); + } + + @Override + public void write(long time, boolean value, boolean isNull) { + valueChunkWriterList.get(valueIndex++).write(time, value, isNull); + } + + @Override + public void write(long time, float value, boolean isNull) { + valueChunkWriterList.get(valueIndex++).write(time, value, isNull); + } + + @Override + public void write(long time, double value, boolean isNull) { + valueChunkWriterList.get(valueIndex++).write(time, value, isNull); + } + + @Override + public void write(long time, Binary value, boolean isNull) { + valueChunkWriterList.get(valueIndex++).write(time, value, isNull); + } + + @Override + public void write(long time) { + valueIndex = 0; + timeChunkWriter.write(time); + if (checkPageSizeAndMayOpenANewPage()) { + writePageToPageBuffer(); + } + } + + // TODO tsfile write interface + @Override + public void write(long[] timestamps, int[] values, int batchSize) { + throw new UnsupportedOperationException(); + } + + @Override + public void write(long[] timestamps, long[] values, int batchSize) { + throw new UnsupportedOperationException(); + } + + @Override + public void write(long[] timestamps, boolean[] values, int batchSize) { + throw new UnsupportedOperationException(); + + } + + @Override + public void write(long[] timestamps, float[] values, int batchSize) { + throw new UnsupportedOperationException(); + + } + + @Override + public void write(long[] timestamps, double[] values, int batchSize) { + throw new UnsupportedOperationException(); + + } + + @Override + public void write(long[] timestamps, Binary[] values, int batchSize) { + throw new UnsupportedOperationException(); + } + + /** + * check occupied memory size, if it exceeds the PageSize threshold, construct a page and put it + * to pageBuffer + */ + private boolean checkPageSizeAndMayOpenANewPage() { + return timeChunkWriter.checkPageSizeAndMayOpenANewPage(); + } + + private void writePageToPageBuffer() { + timeChunkWriter.writePageToPageBuffer(); + for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) { + valueChunkWriter.writePageToPageBuffer(); + } + } + + @Override + public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException { + timeChunkWriter.writeToFileWriter(tsfileWriter); + for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) { + valueChunkWriter.writeToFileWriter(tsfileWriter); + } + } + + @Override + public long estimateMaxSeriesMemSize() { + long estimateMaxSeriesMemSize = timeChunkWriter.estimateMaxSeriesMemSize(); + for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) { + estimateMaxSeriesMemSize += valueChunkWriter.estimateMaxSeriesMemSize(); + } + return estimateMaxSeriesMemSize; + } + + @Override + public long getCurrentChunkSize() { + long currentChunkSize = timeChunkWriter.getCurrentChunkSize(); + for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) { + currentChunkSize += valueChunkWriter.getCurrentChunkSize(); + } + return currentChunkSize; + } + + @Override + public void sealCurrentPage() { + timeChunkWriter.sealCurrentPage(); + for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) { + valueChunkWriter.sealCurrentPage(); + } + } + + @Override + public void clearPageWriter() { + timeChunkWriter.clearPageWriter(); + for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) { + valueChunkWriter.clearPageWriter(); + } + } + + @Override + public int getNumOfPages() { + return timeChunkWriter.getNumOfPages(); + } + + @Override + public TSDataType getDataType() { + return TSDataType.Vector; + } +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java index 8467d15..369b809 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java @@ -265,6 +265,7 @@ public class PageWriter { /** reset this page */ public void reset(MeasurementSchema measurementSchema) { timeOut.reset(); + timeEncoder = measurementSchema.getTimeEncoder(); valueOut.reset(); statistics = Statistics.getStatsByType(measurementSchema.getType()); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/TimePageWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/TimePageWriter.java new file mode 100644 index 0000000..9a1336c --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/TimePageWriter.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.write.page; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import org.apache.iotdb.tsfile.compress.ICompressor; +import org.apache.iotdb.tsfile.encoding.encoder.Encoder; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics; +import org.apache.iotdb.tsfile.utils.PublicBAOS; +import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This writer is used to write time into a page. It consists of a time encoder and respective OutputStream. + */ +public class TimePageWriter { + + private static final Logger logger = LoggerFactory.getLogger(TimePageWriter.class); + + private final ICompressor compressor; + + // time + private Encoder timeEncoder; + private final PublicBAOS timeOut; + + /** + * statistic of current page. It will be reset after calling {@code + * writePageHeaderAndDataIntoBuff()} + */ + private TimeStatistics statistics; + + public TimePageWriter(Encoder timeEncoder, ICompressor compressor) { + this.timeOut = new PublicBAOS(); + this.timeEncoder = timeEncoder; + this.statistics = new TimeStatistics(); + this.compressor = compressor; + } + + /** write a time into encoder */ + public void write(long time) { + timeEncoder.encode(time, timeOut); + statistics.update(time); + } + + /** write time series into encoder */ + public void write(long[] timestamps, int batchSize) { + for (int i = 0; i < batchSize; i++) { + timeEncoder.encode(timestamps[i], timeOut); + } + statistics.update(timestamps, batchSize); + } + + /** flush all data remained in encoders. */ + private void prepareEndWriteOnePage() throws IOException { + timeEncoder.flush(timeOut); + } + + /** + * getUncompressedBytes return data what it has been written in form of <code> + * size of time list, time list, value list</code> + * + * @return a new readable ByteBuffer whose position is 0. + */ + public ByteBuffer getUncompressedBytes() throws IOException { + prepareEndWriteOnePage(); + ByteBuffer buffer = ByteBuffer.allocate(timeOut.size()); + buffer.put(timeOut.getBuf(), 0, timeOut.size()); + buffer.flip(); + return buffer; + } + + /** write the page header and data into the PageWriter's output stream. */ + public int writePageHeaderAndDataIntoBuff(PublicBAOS pageBuffer, boolean first) + throws IOException { + if (statistics.getCount() == 0) { + return 0; + } + + ByteBuffer pageData = getUncompressedBytes(); + int uncompressedSize = pageData.remaining(); + int compressedSize; + byte[] compressedBytes = null; + + if (compressor.getType().equals(CompressionType.UNCOMPRESSED)) { + compressedSize = uncompressedSize; + } else { + compressedBytes = new byte[compressor.getMaxBytesForCompression(uncompressedSize)]; + // data is never a directByteBuffer now, so we can use data.array() + compressedSize = + compressor.compress( + pageData.array(), pageData.position(), uncompressedSize, compressedBytes); + } + + // write the page header to IOWriter + int sizeWithoutStatistic = 0; + if (first) { + sizeWithoutStatistic += + ReadWriteForEncodingUtils.writeUnsignedVarInt(uncompressedSize, pageBuffer); + sizeWithoutStatistic += + ReadWriteForEncodingUtils.writeUnsignedVarInt(compressedSize, pageBuffer); + } else { + ReadWriteForEncodingUtils.writeUnsignedVarInt(uncompressedSize, pageBuffer); + ReadWriteForEncodingUtils.writeUnsignedVarInt(compressedSize, pageBuffer); + statistics.serialize(pageBuffer); + } + + // write page content to temp PBAOS + logger.trace("start to flush a time page data into buffer, buffer position {} ", pageBuffer.size()); + if (compressor.getType().equals(CompressionType.UNCOMPRESSED)) { + try (WritableByteChannel channel = Channels.newChannel(pageBuffer)) { + channel.write(pageData); + } + } else { + pageBuffer.write(compressedBytes, 0, compressedSize); + } + logger.trace("finish flushing a time page data into buffer, buffer position {} ", pageBuffer.size()); + return sizeWithoutStatistic; + } + + /** + * calculate max possible memory size it occupies, including time outputStream and value + * outputStream, because size outputStream is never used until flushing. + * + * @return allocated size in time, value and outputStream + */ + public long estimateMaxMemSize() { + return timeOut.size() + timeEncoder.getMaxByteSize(); + } + + /** reset this page */ + public void reset() { + timeOut.reset(); + statistics = new TimeStatistics(); + } + + public void setTimeEncoder(Encoder encoder) { + this.timeEncoder = encoder; + } + + public void initStatistics() { + statistics = new TimeStatistics(); + } + + public long getPointNumber() { + return statistics.getCount(); + } + + public TimeStatistics getStatistics() { + return statistics; + } +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/ValuePageWriter.java similarity index 69% copy from tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java copy to tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/ValuePageWriter.java index 8467d15..db20cf7 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/ValuePageWriter.java @@ -18,6 +18,10 @@ */ package org.apache.iotdb.tsfile.write.page; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; import org.apache.iotdb.tsfile.compress.ICompressor; import org.apache.iotdb.tsfile.encoding.encoder.Encoder; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; @@ -26,32 +30,20 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.PublicBAOS; import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; - /** - * This writer is used to write time-value into a page. It consists of a time encoder, a value - * encoder and respective OutputStream. + * This writer is used to write value into a page. It consists of a value encoder and respective OutputStream. */ -public class PageWriter { +public class ValuePageWriter { + private static final Logger logger = LoggerFactory.getLogger(ValuePageWriter.class); - private static final Logger logger = LoggerFactory.getLogger(PageWriter.class); + private final ICompressor compressor; - private ICompressor compressor; - - // time - private Encoder timeEncoder; - private PublicBAOS timeOut; // value private Encoder valueEncoder; - private PublicBAOS valueOut; + private final PublicBAOS valueOut; /** * statistic of current page. It will be reset after calling {@code @@ -59,76 +51,101 @@ public class PageWriter { */ private Statistics<?> statistics; - public PageWriter() { - this(null, null); - } + private byte bitmap; - public PageWriter(MeasurementSchema measurementSchema) { - this(measurementSchema.getTimeEncoder(), measurementSchema.getValueEncoder()); - this.statistics = Statistics.getStatsByType(measurementSchema.getType()); - this.compressor = ICompressor.getCompressor(measurementSchema.getCompressor()); - } + private int size; + + private final PublicBAOS bitmapOut; - private PageWriter(Encoder timeEncoder, Encoder valueEncoder) { - this.timeOut = new PublicBAOS(); + private static final int MASK = 1 << 7; + + public ValuePageWriter(Encoder valueEncoder, ICompressor compressor, TSDataType dataType) { this.valueOut = new PublicBAOS(); - this.timeEncoder = timeEncoder; + this.bitmap = 0; + this.size = 0; + this.bitmapOut = new PublicBAOS(); this.valueEncoder = valueEncoder; + this.statistics = Statistics.getStatsByType(dataType); + this.compressor = compressor; } /** write a time value pair into encoder */ - public void write(long time, boolean value) { - timeEncoder.encode(time, timeOut); - valueEncoder.encode(value, valueOut); - statistics.update(time, value); + public void write(long time, boolean value, boolean isNull) { + setBit(isNull); + if (!isNull) { + valueEncoder.encode(value, valueOut); + statistics.update(time, value); + } } /** write a time value pair into encoder */ - public void write(long time, short value) { - timeEncoder.encode(time, timeOut); - valueEncoder.encode(value, valueOut); - statistics.update(time, value); + public void write(long time, short value, boolean isNull) { + setBit(isNull); + if (!isNull) { + valueEncoder.encode(value, valueOut); + statistics.update(time, value); + } } /** write a time value pair into encoder */ - public void write(long time, int value) { - timeEncoder.encode(time, timeOut); - valueEncoder.encode(value, valueOut); - statistics.update(time, value); + public void write(long time, int value, boolean isNull) { + setBit(isNull); + if (!isNull) { + valueEncoder.encode(value, valueOut); + statistics.update(time, value); + } } /** write a time value pair into encoder */ - public void write(long time, long value) { - timeEncoder.encode(time, timeOut); - valueEncoder.encode(value, valueOut); - statistics.update(time, value); + public void write(long time, long value, boolean isNull) { + setBit(isNull); + if (!isNull) { + valueEncoder.encode(value, valueOut); + statistics.update(time, value); + } } /** write a time value pair into encoder */ - public void write(long time, float value) { - timeEncoder.encode(time, timeOut); - valueEncoder.encode(value, valueOut); - statistics.update(time, value); + public void write(long time, float value, boolean isNull) { + setBit(isNull); + if (!isNull) { + valueEncoder.encode(value, valueOut); + statistics.update(time, value); + } } /** write a time value pair into encoder */ - public void write(long time, double value) { - timeEncoder.encode(time, timeOut); - valueEncoder.encode(value, valueOut); - statistics.update(time, value); + public void write(long time, double value, boolean isNull) { + setBit(isNull); + if (!isNull) { + valueEncoder.encode(value, valueOut); + statistics.update(time, value); + } } /** write a time value pair into encoder */ - public void write(long time, Binary value) { - timeEncoder.encode(time, timeOut); - valueEncoder.encode(value, valueOut); - statistics.update(time, value); + public void write(long time, Binary value, boolean isNull) { + setBit(isNull); + if (!isNull) { + valueEncoder.encode(value, valueOut); + statistics.update(time, value); + } + } + + private void setBit(boolean isNull) { + if (!isNull) { + bitmap |= (MASK >>> (size % 8)); + } + size++; + if (size % 8 == 0) { + bitmapOut.write(bitmap); + bitmap = 0; + } } /** write time series into encoder */ public void write(long[] timestamps, boolean[] values, int batchSize) { for (int i = 0; i < batchSize; i++) { - timeEncoder.encode(timestamps[i], timeOut); valueEncoder.encode(values[i], valueOut); } statistics.update(timestamps, values, batchSize); @@ -137,7 +154,6 @@ public class PageWriter { /** write time series into encoder */ public void write(long[] timestamps, int[] values, int batchSize) { for (int i = 0; i < batchSize; i++) { - timeEncoder.encode(timestamps[i], timeOut); valueEncoder.encode(values[i], valueOut); } statistics.update(timestamps, values, batchSize); @@ -146,7 +162,6 @@ public class PageWriter { /** write time series into encoder */ public void write(long[] timestamps, long[] values, int batchSize) { for (int i = 0; i < batchSize; i++) { - timeEncoder.encode(timestamps[i], timeOut); valueEncoder.encode(values[i], valueOut); } statistics.update(timestamps, values, batchSize); @@ -155,7 +170,6 @@ public class PageWriter { /** write time series into encoder */ public void write(long[] timestamps, float[] values, int batchSize) { for (int i = 0; i < batchSize; i++) { - timeEncoder.encode(timestamps[i], timeOut); valueEncoder.encode(values[i], valueOut); } statistics.update(timestamps, values, batchSize); @@ -164,7 +178,6 @@ public class PageWriter { /** write time series into encoder */ public void write(long[] timestamps, double[] values, int batchSize) { for (int i = 0; i < batchSize; i++) { - timeEncoder.encode(timestamps[i], timeOut); valueEncoder.encode(values[i], valueOut); } statistics.update(timestamps, values, batchSize); @@ -173,7 +186,6 @@ public class PageWriter { /** write time series into encoder */ public void write(long[] timestamps, Binary[] values, int batchSize) { for (int i = 0; i < batchSize; i++) { - timeEncoder.encode(timestamps[i], timeOut); valueEncoder.encode(values[i], valueOut); } statistics.update(timestamps, values, batchSize); @@ -181,8 +193,10 @@ public class PageWriter { /** flush all data remained in encoders. */ private void prepareEndWriteOnePage() throws IOException { - timeEncoder.flush(timeOut); valueEncoder.flush(valueOut); + if (size % 8 != 0) { + bitmapOut.write(bitmap); + } } /** @@ -193,9 +207,9 @@ public class PageWriter { */ public ByteBuffer getUncompressedBytes() throws IOException { prepareEndWriteOnePage(); - ByteBuffer buffer = ByteBuffer.allocate(timeOut.size() + valueOut.size() + 4); - ReadWriteForEncodingUtils.writeUnsignedVarInt(timeOut.size(), buffer); - buffer.put(timeOut.getBuf(), 0, timeOut.size()); + ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES + bitmapOut.size() + valueOut.size()); + buffer.putInt(size); + buffer.put(bitmapOut.getBuf(), 0, bitmapOut.size()); buffer.put(valueOut.getBuf(), 0, valueOut.size()); buffer.flip(); return buffer; @@ -204,7 +218,7 @@ public class PageWriter { /** write the page header and data into the PageWriter's output stream. */ public int writePageHeaderAndDataIntoBuff(PublicBAOS pageBuffer, boolean first) throws IOException { - if (statistics.getCount() == 0) { + if (size == 0) { return 0; } @@ -256,21 +270,16 @@ public class PageWriter { * @return allocated size in time, value and outputStream */ public long estimateMaxMemSize() { - return timeOut.size() - + valueOut.size() - + timeEncoder.getMaxByteSize() - + valueEncoder.getMaxByteSize(); + return Integer.BYTES + bitmapOut.size() + 1 + valueOut.size() + valueEncoder.getMaxByteSize(); } /** reset this page */ - public void reset(MeasurementSchema measurementSchema) { - timeOut.reset(); + public void reset(TSDataType dataType) { + bitmapOut.reset(); + size = 0; + bitmap = 0; valueOut.reset(); - statistics = Statistics.getStatsByType(measurementSchema.getType()); - } - - public void setTimeEncoder(Encoder encoder) { - this.timeEncoder = encoder; + statistics = Statistics.getStatsByType(dataType); } public void setValueEncoder(Encoder encoder) { @@ -288,4 +297,8 @@ public class PageWriter { public Statistics<?> getStatistics() { return statistics; } + + public int getSize() { + return size; + } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/BooleanDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/BooleanDataPoint.java index b607a79..e8b4bee 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/BooleanDataPoint.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/BooleanDataPoint.java @@ -47,7 +47,7 @@ public class BooleanDataPoint extends DataPoint { LOG.warn("given IChunkWriter is null, do nothing and return"); return; } - writer.write(time, value); + writer.write(time, value, false); } @Override diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DoubleDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DoubleDataPoint.java index b988ed7..853313e 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DoubleDataPoint.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DoubleDataPoint.java @@ -47,7 +47,7 @@ public class DoubleDataPoint extends DataPoint { LOG.warn("given IChunkWriter is null, do nothing and return"); return; } - writer.write(time, value); + writer.write(time, value, false); } @Override diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/FloatDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/FloatDataPoint.java index 3d13f17..863be98 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/FloatDataPoint.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/FloatDataPoint.java @@ -47,7 +47,7 @@ public class FloatDataPoint extends DataPoint { LOG.warn("given IChunkWriter is null, do nothing and return"); return; } - writer.write(time, value); + writer.write(time, value, false); } @Override diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/IntDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/IntDataPoint.java index a66378e..02e0d5c 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/IntDataPoint.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/IntDataPoint.java @@ -47,7 +47,7 @@ public class IntDataPoint extends DataPoint { LOG.warn("given IChunkWriter is null, do nothing and return"); return; } - writer.write(time, value); + writer.write(time, value, false); } @Override diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/LongDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/LongDataPoint.java index 4bc2e9f..8dce510 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/LongDataPoint.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/LongDataPoint.java @@ -47,7 +47,7 @@ public class LongDataPoint extends DataPoint { LOG.warn("given IChunkWriter is null, do nothing and return"); return; } - writer.write(time, value); + writer.write(time, value, false); } @Override diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/StringDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/StringDataPoint.java index 3a3918c..cf371bc 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/StringDataPoint.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/StringDataPoint.java @@ -48,7 +48,7 @@ public class StringDataPoint extends DataPoint { LOG.warn("given IChunkWriter is null, do nothing and return"); return; } - writer.write(time, value); + writer.write(time, value, false); } @Override diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/IMeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/IMeasurementSchema.java new file mode 100644 index 0000000..2098833 --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/IMeasurementSchema.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.write.schema; + +import java.util.List; +import org.apache.iotdb.tsfile.encoding.encoder.Encoder; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; + +public interface IMeasurementSchema { + + String getMeasurementId(); + + CompressionType getCompressor(); + + TSDataType getType(); + + TSEncoding getTimeTSEncoding(); + + Encoder getTimeEncoder(); + + List<String> getValueMeasurementIdList(); + + List<TSDataType> getValueTSDataTypeList(); + + List<TSEncoding> getValueTSEncodingList(); + + List<Encoder> getValueEncoderList(); +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java index 6dfeec8..dc1b59a 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java @@ -18,6 +18,15 @@ */ package org.apache.iotdb.tsfile.write.writer; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.MetaMarker; @@ -39,21 +48,9 @@ import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.utils.BytesUtils; import org.apache.iotdb.tsfile.utils.PublicBAOS; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - /** * TsFileIOWriter is used to construct metadata and write data stored in memory to output stream. */ @@ -92,8 +89,11 @@ public class TsFileIOWriter { private long minPlanIndex; private long maxPlanIndex; - /** empty construct function. */ - protected TsFileIOWriter() {} + /** + * empty construct function. + */ + protected TsFileIOWriter() { + } /** * for writing a new tsfile. @@ -121,6 +121,13 @@ public class TsFileIOWriter { } /** + * for test only + */ + public TsFileIOWriter(TsFileOutput output, boolean test) { + this.out = output; + } + + /** * Writes given bytes to output stream. This method is called when total memory size exceeds the * chunk group size threshold. * @@ -163,39 +170,44 @@ public class TsFileIOWriter { /** * start a {@linkplain ChunkMetadata ChunkMetaData}. * - * @param measurementSchema - schema of this time series + * @param measurementId - measurementId of this time series * @param compressionCodecName - compression name of this time series - * @param tsDataType - data type - * @param statistics - Chunk statistics - * @param dataSize - the serialized size of all pages + * @param tsDataType - data type + * @param statistics - Chunk statistics + * @param dataSize - the serialized size of all pages + * @param mask - 0x80 for time chunk, 0x40 for value chunk, 0x00 for common chunk * @throws IOException if I/O error occurs */ public void startFlushChunk( - MeasurementSchema measurementSchema, + String measurementId, CompressionType compressionCodecName, TSDataType tsDataType, TSEncoding encodingType, Statistics<?> statistics, int dataSize, - int numOfPages) + int numOfPages, + int mask) throws IOException { currentChunkMetadata = new ChunkMetadata( - measurementSchema.getMeasurementId(), tsDataType, out.getPosition(), statistics); + measurementId, tsDataType, out.getPosition(), statistics); ChunkHeader header = new ChunkHeader( - measurementSchema.getMeasurementId(), + measurementId, dataSize, tsDataType, compressionCodecName, encodingType, - numOfPages); + numOfPages, + mask); header.serializeTo(out.wrapAsStream()); } - /** Write a whole chunk in another file into this file. Providing fast merge for IoTDB. */ + /** + * Write a whole chunk in another file into this file. Providing fast merge for IoTDB. + */ public void writeChunk(Chunk chunk, ChunkMetadata chunkMetadata) throws IOException { ChunkHeader chunkHeader = chunk.getHeader(); currentChunkMetadata = @@ -215,7 +227,9 @@ public class TsFileIOWriter { } } - /** end chunk and write some log. */ + /** + * end chunk and write some log. + */ public void endCurrentChunk() { chunkMetadataList.add(currentChunkMetadata); currentChunkMetadata = null; @@ -368,11 +382,11 @@ public class TsFileIOWriter { } void writeSeparatorMaskForTest() throws IOException { - out.write(new byte[] {MetaMarker.SEPARATOR}); + out.write(new byte[]{MetaMarker.SEPARATOR}); } void writeChunkMaskForTest() throws IOException { - out.write(new byte[] {MetaMarker.CHUNK_HEADER}); + out.write(new byte[]{MetaMarker.CHUNK_HEADER}); } public File getFile() { @@ -383,7 +397,9 @@ public class TsFileIOWriter { this.file = file; } - /** Remove such ChunkMetadata that its startTime is not in chunkStartTimes */ + /** + * Remove such ChunkMetadata that its startTime is not in chunkStartTimes + */ public void filterChunks(Map<Path, List<Long>> chunkStartTimes) { Map<Path, Integer> startTimeIdxes = new HashMap<>(); chunkStartTimes.forEach((p, t) -> startTimeIdxes.put(p, 0)); diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java index 570ad8e..a0eb0e9 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java @@ -62,12 +62,13 @@ public class TsFileIOWriterTest { // chunk group 1 writer.startChunkGroup(deviceId); writer.startFlushChunk( - measurementSchema, + measurementSchema.getMeasurementId(), measurementSchema.getCompressor(), measurementSchema.getType(), measurementSchema.getEncodingType(), statistics, 0, + 0, 0); writer.endCurrentChunk(); writer.endChunkGroup(); diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java index 64db027..fd7c83b 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java @@ -19,6 +19,16 @@ package org.apache.iotdb.tsfile.write.writer; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileWriter; +import java.util.ArrayList; +import java.util.List; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.constant.TestConstant; import org.apache.iotdb.tsfile.exception.NotCompatibleTsFileException; @@ -42,20 +52,8 @@ import org.apache.iotdb.tsfile.write.TsFileWriter; import org.apache.iotdb.tsfile.write.record.TSRecord; import org.apache.iotdb.tsfile.write.record.datapoint.FloatDataPoint; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; - import org.junit.Test; -import java.io.File; -import java.io.FileWriter; -import java.util.ArrayList; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - @SuppressWarnings("squid:S4042") // Suppress use java.nio.Files#delete warning public class RestorableTsFileIOWriterTest { @@ -131,13 +129,14 @@ public class RestorableTsFileIOWriterTest { writer .getIOWriter() .startFlushChunk( - new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.PLAIN), + new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.PLAIN).getMeasurementId(), CompressionType.SNAPPY, TSDataType.FLOAT, TSEncoding.PLAIN, new FloatStatistics(), 100, - 10); + 10, + 0); writer.getIOWriter().close(); RestorableTsFileIOWriter rWriter = new RestorableTsFileIOWriter(file); diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TestTsFileOutput.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TestTsFileOutput.java new file mode 100644 index 0000000..6317459 --- /dev/null +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TestTsFileOutput.java @@ -0,0 +1,52 @@ +package org.apache.iotdb.tsfile.write.writer; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import org.apache.iotdb.tsfile.utils.PublicBAOS; + +public class TestTsFileOutput implements TsFileOutput { + + PublicBAOS publicBAOS = new PublicBAOS(); + + @Override + public void write(byte[] b) throws IOException { + publicBAOS.write(b); + } + + @Override + public void write(byte b) { + publicBAOS.write(b); + } + + @Override + public void write(ByteBuffer b) { + publicBAOS.write(b.array(), b.position(), b.limit()); + } + + @Override + public long getPosition() { + return publicBAOS.size(); + } + + @Override + public void close() throws IOException { + publicBAOS.close(); + } + + @Override + public OutputStream wrapAsStream() { + return publicBAOS; + } + + @Override + public void flush() throws IOException { + publicBAOS.flush(); + + } + + @Override + public void truncate(long size) { + publicBAOS.truncate((int) size); + } +} diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimeChunkWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimeChunkWriterTest.java new file mode 100644 index 0000000..0c62d06 --- /dev/null +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimeChunkWriterTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.write.writer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.iotdb.tsfile.encoding.encoder.Encoder; +import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder; +import org.apache.iotdb.tsfile.file.MetaMarker; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.utils.PublicBAOS; +import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; +import org.apache.iotdb.tsfile.write.chunk.TimeChunkWriter; +import org.junit.Test; + +public class TimeChunkWriterTest { + + @Test + public void testWrite1() { + Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0); + TimeChunkWriter chunkWriter = new TimeChunkWriter("c1", CompressionType.UNCOMPRESSED, + TSEncoding.PLAIN, timeEncoder); + for (long time = 1; time <= 10; time++) { + chunkWriter.write(time); + } + assertFalse(chunkWriter.checkPageSizeAndMayOpenANewPage()); + chunkWriter.sealCurrentPage(); + // page without statistics size: 82 + chunk header size: 8 + assertEquals(90L, chunkWriter.getCurrentChunkSize()); + + try { + TestTsFileOutput testTsFileOutput = new TestTsFileOutput(); + TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true); + chunkWriter.writeAllPagesOfChunkToTsFile(writer); + PublicBAOS publicBAOS = testTsFileOutput.publicBAOS; + ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); + assertEquals((byte) (0x80 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer)); + assertEquals("c1", ReadWriteIOUtils.readVarIntString(buffer)); + assertEquals(82, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(82, buffer.remaining()); + } catch (IOException e) { + e.printStackTrace(); + fail(); + } + } + + @Test + public void testWrite2() { + Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0); + TimeChunkWriter chunkWriter = new TimeChunkWriter("c1", CompressionType.UNCOMPRESSED, + TSEncoding.PLAIN, timeEncoder); + for (long time = 1; time <= 10; time++) { + chunkWriter.write(time); + } + chunkWriter.sealCurrentPage(); + for (long time = 11; time <= 20; time++) { + chunkWriter.write(time); + } + chunkWriter.sealCurrentPage(); + assertEquals(2, chunkWriter.getNumOfPages()); + // two pages with statistics size: (82 + 17) * 2 + chunk header size: 9 + assertEquals(207L, chunkWriter.getCurrentChunkSize()); + + try { + TestTsFileOutput testTsFileOutput = new TestTsFileOutput(); + TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true); + chunkWriter.writeAllPagesOfChunkToTsFile(writer); + PublicBAOS publicBAOS = testTsFileOutput.publicBAOS; + ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); + assertEquals((byte) (0x80 | MetaMarker.CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer)); + assertEquals("c1", ReadWriteIOUtils.readVarIntString(buffer)); + assertEquals(198, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(198, buffer.remaining()); + } catch (IOException e) { + e.printStackTrace(); + fail(); + } + } +} diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java new file mode 100644 index 0000000..6adfb8c --- /dev/null +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.write.writer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.iotdb.tsfile.compress.ICompressor; +import org.apache.iotdb.tsfile.compress.IUnCompressor; +import org.apache.iotdb.tsfile.encoding.decoder.PlainDecoder; +import org.apache.iotdb.tsfile.encoding.encoder.Encoder; +import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics; +import org.apache.iotdb.tsfile.utils.PublicBAOS; +import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; +import org.apache.iotdb.tsfile.write.page.TimePageWriter; +import org.junit.Test; + +public class TimePageWriterTest { + + + @Test + public void testWrite() { + Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0); + ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED); + TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor); + try { + pageWriter.write(1L); + assertEquals(8, pageWriter.estimateMaxMemSize()); + ByteBuffer buffer1 = pageWriter.getUncompressedBytes(); + ByteBuffer buffer = ByteBuffer.wrap(buffer1.array()); + pageWriter.reset(); + assertEquals(0, pageWriter.estimateMaxMemSize()); + byte[] timeBytes = new byte[8]; + buffer.get(timeBytes); + ByteBuffer buffer2 = ByteBuffer.wrap(timeBytes); + PlainDecoder decoder = new PlainDecoder(); + assertEquals(1L, decoder.readLong(buffer2)); + decoder.reset(); + } catch (IOException e) { + fail(); + } + } + + + @Test + public void testWritePageHeaderAndDataIntoBuffWithoutCompress1() { + Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0); + ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED); + TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor); + PublicBAOS publicBAOS = new PublicBAOS(); + try { + pageWriter.write(1L); + pageWriter.write(2L); + pageWriter.write(3L); + // without page statistics + assertEquals(2, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, true)); + // total size + assertEquals(26, publicBAOS.size()); + TimeStatistics statistics = pageWriter.getStatistics(); + assertEquals(1L, statistics.getStartTime()); + assertEquals(3L, statistics.getEndTime()); + assertEquals(3, statistics.getCount()); + ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); + // uncompressedSize + assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + // compressedSize + assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + assertEquals(1L, ReadWriteIOUtils.readLong(buffer)); + assertEquals(2L, ReadWriteIOUtils.readLong(buffer)); + assertEquals(3L, ReadWriteIOUtils.readLong(buffer)); + } catch (IOException e) { + fail(); + } + } + + @Test + public void testWritePageHeaderAndDataIntoBuffWithoutCompress2() { + Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0); + ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED); + TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor); + PublicBAOS publicBAOS = new PublicBAOS(); + try { + pageWriter.write(1L); + pageWriter.write(2L); + pageWriter.write(3L); + // with page statistics + assertEquals(0, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, false)); + // total size + assertEquals(43, publicBAOS.size()); + TimeStatistics statistics = pageWriter.getStatistics(); + assertEquals(1L, statistics.getStartTime()); + assertEquals(3L, statistics.getEndTime()); + assertEquals(3, statistics.getCount()); + ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); + // uncompressedSize + assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + // compressedSize + assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + TimeStatistics testStatistics = (TimeStatistics) TimeStatistics + .deserialize(buffer, TSDataType.Vector); + assertEquals(1L, testStatistics.getStartTime()); + assertEquals(3L, testStatistics.getEndTime()); + assertEquals(3, testStatistics.getCount()); + assertEquals(1L, ReadWriteIOUtils.readLong(buffer)); + assertEquals(2L, ReadWriteIOUtils.readLong(buffer)); + assertEquals(3L, ReadWriteIOUtils.readLong(buffer)); + } catch (IOException e) { + fail(); + } + } + + + @Test + public void testWritePageHeaderAndDataIntoBuffWithSnappy() { + Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0); + ICompressor compressor = ICompressor.getCompressor(CompressionType.SNAPPY); + TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor); + PublicBAOS publicBAOS = new PublicBAOS(); + try { + pageWriter.write(1L); + pageWriter.write(2L); + pageWriter.write(3L); + // without page statistics + assertEquals(2, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, true)); + + // total size + assertEquals(22, publicBAOS.size()); + TimeStatistics statistics = pageWriter.getStatistics(); + assertEquals(1L, statistics.getStartTime()); + assertEquals(3L, statistics.getEndTime()); + assertEquals(3, statistics.getCount()); + ByteBuffer compressedBuffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); + // uncompressedSize + assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(compressedBuffer)); + // compressedSize + assertEquals(20, ReadWriteForEncodingUtils.readUnsignedVarInt(compressedBuffer)); + byte[] compress = new byte[20]; + compressedBuffer.get(compress); + byte[] uncompress = new byte[24]; + IUnCompressor unCompressor = IUnCompressor.getUnCompressor(CompressionType.SNAPPY); + unCompressor.uncompress(compress, 0, 20, uncompress, 0); + ByteBuffer uncompressedBuffer = ByteBuffer.wrap(uncompress); + assertEquals(1L, ReadWriteIOUtils.readLong(uncompressedBuffer)); + assertEquals(2L, ReadWriteIOUtils.readLong(uncompressedBuffer)); + assertEquals(3L, ReadWriteIOUtils.readLong(uncompressedBuffer)); + } catch (IOException e) { + fail(); + } + } +} diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValueChunkWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValueChunkWriterTest.java new file mode 100644 index 0000000..0be1d96 --- /dev/null +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValueChunkWriterTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.write.writer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.iotdb.tsfile.encoding.encoder.Encoder; +import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder; +import org.apache.iotdb.tsfile.file.MetaMarker; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.utils.PublicBAOS; +import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; +import org.apache.iotdb.tsfile.write.chunk.ValueChunkWriter; +import org.junit.Test; + +public class ValueChunkWriterTest { + + @Test + public void testWrite1() { + Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0); + ValueChunkWriter chunkWriter = new ValueChunkWriter("s1", CompressionType.UNCOMPRESSED, + TSDataType.FLOAT, TSEncoding.PLAIN, valueEncoder); + for (int time = 1; time <= 20; time++) { + chunkWriter.write(time, (float) time, time % 4 == 0); + } + chunkWriter.sealCurrentPage(); + // page without statistics size: 69 + chunk header size: 8 + assertEquals(77L, chunkWriter.getCurrentChunkSize()); + + try { + TestTsFileOutput testTsFileOutput = new TestTsFileOutput(); + TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true); + chunkWriter.writeAllPagesOfChunkToTsFile(writer); + PublicBAOS publicBAOS = testTsFileOutput.publicBAOS; + ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); + assertEquals(0x40 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer)); + assertEquals("s1", ReadWriteIOUtils.readVarIntString(buffer)); + assertEquals(69, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + assertEquals(TSDataType.FLOAT.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(69, buffer.remaining()); + } catch (IOException e) { + e.printStackTrace(); + fail(); + } + } + + @Test + public void testWrite2() { + Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0); + ValueChunkWriter chunkWriter = new ValueChunkWriter("s1", CompressionType.UNCOMPRESSED, + TSDataType.FLOAT, TSEncoding.PLAIN, valueEncoder); + for (int time = 1; time <= 20; time++) { + chunkWriter.write(time, (float) time, time % 4 == 0); + } + chunkWriter.sealCurrentPage(); + for (int time = 20; time <= 40; time++) { + chunkWriter.write(time, (float) time, time % 4 == 0); + } + chunkWriter.sealCurrentPage(); + // two pages with statistics size: (69 + 41) * 2 + chunk header size: 9 + assertEquals(229L, chunkWriter.getCurrentChunkSize()); + + TestTsFileOutput testTsFileOutput = new TestTsFileOutput(); + TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true); + try { + chunkWriter.writeAllPagesOfChunkToTsFile(writer); + PublicBAOS publicBAOS = testTsFileOutput.publicBAOS; + ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); + assertEquals(0x40 | MetaMarker.CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer)); + assertEquals("s1", ReadWriteIOUtils.readVarIntString(buffer)); + assertEquals(220, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + assertEquals(TSDataType.FLOAT.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(220, buffer.remaining()); + } catch (IOException e) { + e.printStackTrace(); + fail(); + } + } +} diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValuePageWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValuePageWriterTest.java new file mode 100644 index 0000000..c0557c5 --- /dev/null +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValuePageWriterTest.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.write.writer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.iotdb.tsfile.compress.ICompressor; +import org.apache.iotdb.tsfile.compress.IUnCompressor; +import org.apache.iotdb.tsfile.encoding.decoder.PlainDecoder; +import org.apache.iotdb.tsfile.encoding.encoder.Encoder; +import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.FloatStatistics; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.utils.PublicBAOS; +import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; +import org.apache.iotdb.tsfile.write.page.ValuePageWriter; +import org.junit.Test; + +public class ValuePageWriterTest { + + @Test + public void testWrite1() { + Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0); + ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED); + ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT); + try { + pageWriter.write(1L, 1.0f, false); + assertEquals(9, pageWriter.estimateMaxMemSize()); + ByteBuffer buffer1 = pageWriter.getUncompressedBytes(); + ByteBuffer buffer = ByteBuffer.wrap(buffer1.array()); + pageWriter.reset(TSDataType.FLOAT); + assertEquals(5, pageWriter.estimateMaxMemSize()); + assertEquals(1, ReadWriteIOUtils.readInt(buffer)); + assertEquals(((byte) (1 << 7)), ReadWriteIOUtils.readByte(buffer)); + PlainDecoder decoder = new PlainDecoder(); + assertEquals(1.0f, ReadWriteIOUtils.readFloat(buffer), 0.000001f); + assertEquals(0, buffer.remaining()); + decoder.reset(); + } catch (IOException e) { + fail(); + } + } + + @Test + public void testWrite2() { + Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0); + ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED); + ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT); + try { + for (int time = 1; time <= 16; time++) { + pageWriter.write(time, (float) time, time % 4 == 0); + } + assertEquals(55, pageWriter.estimateMaxMemSize()); + ByteBuffer buffer1 = pageWriter.getUncompressedBytes(); + ByteBuffer buffer = ByteBuffer.wrap(buffer1.array()); + pageWriter.reset(TSDataType.FLOAT); + assertEquals(5, pageWriter.estimateMaxMemSize()); + assertEquals(16, ReadWriteIOUtils.readInt(buffer)); + assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(buffer)); + assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(buffer)); + PlainDecoder decoder = new PlainDecoder(); + for (int value = 1; value <= 16; value++) { + if (value % 4 != 0) { + assertEquals((float) value, ReadWriteIOUtils.readFloat(buffer), 0.000001f); + } + } + assertEquals(0, buffer.remaining()); + decoder.reset(); + } catch (IOException e) { + fail(); + } + } + + @Test + public void testWrite3() { + Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0); + ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED); + ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT); + try { + for (int time = 1; time <= 20; time++) { + pageWriter.write(time, (float) time, time % 4 == 0); + } + assertEquals(67, pageWriter.estimateMaxMemSize()); + ByteBuffer buffer1 = pageWriter.getUncompressedBytes(); + ByteBuffer buffer = ByteBuffer.wrap(buffer1.array()); + pageWriter.reset(TSDataType.FLOAT); + assertEquals(5, pageWriter.estimateMaxMemSize()); + assertEquals(20, ReadWriteIOUtils.readInt(buffer)); + assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(buffer)); + assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(buffer)); + assertEquals(((byte) (0xE0)), ReadWriteIOUtils.readByte(buffer)); + PlainDecoder decoder = new PlainDecoder(); + for (int value = 1; value <= 20; value++) { + if (value % 4 != 0) { + assertEquals((float) value, ReadWriteIOUtils.readFloat(buffer), 0.000001f); + } + } + assertEquals(0, buffer.remaining()); + decoder.reset(); + } catch (IOException e) { + fail(); + } + } + + @Test + public void testWritePageHeaderAndDataIntoBuffWithoutCompress1() { + Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0); + ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED); + ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT); + PublicBAOS publicBAOS = new PublicBAOS(); + try { + for (int time = 1; time <= 20; time++) { + pageWriter.write(time, (float) time, time % 4 == 0); + } + // without page statistics + assertEquals(2, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, true)); + // total size + assertEquals(69, publicBAOS.size()); + Statistics<Float> statistics = (Statistics<Float>) pageWriter.getStatistics(); + assertEquals(1L, statistics.getStartTime()); + assertEquals(19L, statistics.getEndTime()); + assertEquals(15, statistics.getCount()); + assertEquals(1.0f, statistics.getFirstValue(), 0.000001f); + assertEquals(19.0f, statistics.getLastValue(), 0.000001f); + assertEquals(1.0f, statistics.getMinValue(), 0.000001f); + assertEquals(19.0f, statistics.getMaxValue(), 0.000001f); + assertEquals(150.0f, (float) statistics.getSumDoubleValue(), 0.000001f); + + ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); + + // uncompressedSize + assertEquals(67, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + // compressedSize + assertEquals(67, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + + // bitmap + assertEquals(20, ReadWriteIOUtils.readInt(buffer)); + assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(buffer)); + assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(buffer)); + assertEquals(((byte) (0xE0)), ReadWriteIOUtils.readByte(buffer)); + + for (int value = 1; value <= 20; value++) { + if (value % 4 != 0) { + assertEquals((float) value, ReadWriteIOUtils.readFloat(buffer), 0.000001f); + } + } + assertEquals(0, buffer.remaining()); + } catch (IOException e) { + fail(); + } + } + + @Test + public void testWritePageHeaderAndDataIntoBuffWithoutCompress2() { + Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0); + ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED); + ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT); + PublicBAOS publicBAOS = new PublicBAOS(); + try { + for (int time = 1; time <= 20; time++) { + pageWriter.write(time, (float) time, time % 4 == 0); + } + // without page statistics + assertEquals(0, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, false)); + // total size + assertEquals(110, publicBAOS.size()); + Statistics<Float> statistics = (Statistics<Float>) pageWriter.getStatistics(); + assertEquals(1L, statistics.getStartTime()); + assertEquals(19L, statistics.getEndTime()); + assertEquals(15, statistics.getCount()); + assertEquals(1.0f, statistics.getFirstValue(), 0.000001f); + assertEquals(19.0f, statistics.getLastValue(), 0.000001f); + assertEquals(1.0f, statistics.getMinValue(), 0.000001f); + assertEquals(19.0f, statistics.getMaxValue(), 0.000001f); + assertEquals(150.0f, (float) statistics.getSumDoubleValue(), 0.000001f); + + ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); + // uncompressedSize + assertEquals(67, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + // compressedSize + assertEquals(67, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + + // Statistics + FloatStatistics testStatistics = (FloatStatistics) FloatStatistics + .deserialize(buffer, TSDataType.FLOAT); + assertEquals(1L, testStatistics.getStartTime()); + assertEquals(19L, testStatistics.getEndTime()); + assertEquals(15, testStatistics.getCount()); + assertEquals(1.0f, testStatistics.getFirstValue(), 0.000001f); + assertEquals(19.0f, testStatistics.getLastValue(), 0.000001f); + assertEquals(1.0f, testStatistics.getMinValue(), 0.000001f); + assertEquals(19.0f, testStatistics.getMaxValue(), 0.000001f); + assertEquals(150.0f, (float) testStatistics.getSumDoubleValue(), 0.000001f); + + // bitmap + assertEquals(20, ReadWriteIOUtils.readInt(buffer)); + assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(buffer)); + assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(buffer)); + assertEquals(((byte) (0xE0)), ReadWriteIOUtils.readByte(buffer)); + + + for (int value = 1; value <= 20; value++) { + if (value % 4 != 0) { + assertEquals((float) value, ReadWriteIOUtils.readFloat(buffer), 0.000001f); + } + } + assertEquals(0, buffer.remaining()); + } catch (IOException e) { + fail(); + } + } + + @Test + public void testWritePageHeaderAndDataIntoBuffWithSnappy() { + Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0); + ICompressor compressor = ICompressor.getCompressor(CompressionType.SNAPPY); + ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT); + PublicBAOS publicBAOS = new PublicBAOS(); + try { + for (int time = 1; time <= 20; time++) { + pageWriter.write(time, (float) time, time % 4 == 0); + } + // without page statistics + assertEquals(2, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, true)); + // total size + assertEquals(72, publicBAOS.size()); + Statistics<Float> statistics = (Statistics<Float>) pageWriter.getStatistics(); + assertEquals(1L, statistics.getStartTime()); + assertEquals(19L, statistics.getEndTime()); + assertEquals(15, statistics.getCount()); + assertEquals(1.0f, statistics.getFirstValue(), 0.000001f); + assertEquals(19.0f, statistics.getLastValue(), 0.000001f); + assertEquals(1.0f, statistics.getMinValue(), 0.000001f); + assertEquals(19.0f, statistics.getMaxValue(), 0.000001f); + assertEquals(150.0f, (float) statistics.getSumDoubleValue(), 0.000001f); + + ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); + + // uncompressedSize + assertEquals(67, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + // compressedSize + assertEquals(70, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + + byte[] compress = new byte[70]; + buffer.get(compress); + byte[] uncompress = new byte[67]; + IUnCompressor unCompressor = IUnCompressor.getUnCompressor(CompressionType.SNAPPY); + unCompressor.uncompress(compress, 0, 70, uncompress, 0); + ByteBuffer uncompressedBuffer = ByteBuffer.wrap(uncompress); + + // bitmap + assertEquals(20, ReadWriteIOUtils.readInt(uncompressedBuffer)); + assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(uncompressedBuffer)); + assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(uncompressedBuffer)); + assertEquals(((byte) (0xE0)), ReadWriteIOUtils.readByte(uncompressedBuffer)); + + for (int value = 1; value <= 20; value++) { + if (value % 4 != 0) { + assertEquals((float) value, ReadWriteIOUtils.readFloat(uncompressedBuffer), 0.000001f); + } + } + assertEquals(0, uncompressedBuffer.remaining()); + } catch (IOException e) { + e.printStackTrace(); + fail(); + } + } +} diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorChunkWriterImplTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorChunkWriterImplTest.java new file mode 100644 index 0000000..c91d79e --- /dev/null +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorChunkWriterImplTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.write.writer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.iotdb.tsfile.file.MetaMarker; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.utils.PublicBAOS; +import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; +import org.apache.iotdb.tsfile.write.chunk.VectorChunkWriterImpl; +import org.junit.Test; + +public class VectorChunkWriterImplTest { + + @Test + public void testWrite1() { + VectorMeasurementSchemaStub measurementSchema = new VectorMeasurementSchemaStub(); + VectorChunkWriterImpl chunkWriter = new VectorChunkWriterImpl(measurementSchema); + + for (int time = 1; time <= 20; time++) { + chunkWriter.write(time, (float) time, false); + chunkWriter.write(time, time, false); + chunkWriter.write(time, (double) time, false); + chunkWriter.write(time); + } + + chunkWriter.sealCurrentPage(); + // time chunk: 14 + 4 + 160; value chunk 1: 8 + 2 + 4 + 3 + 80; value chunk 2: 8 + 2 + 4 + 3 + 20; value chunk 3: 9 + 4 + 7 + 20 * 8; + assertEquals(492L, chunkWriter.getCurrentChunkSize()); + + try { + TestTsFileOutput testTsFileOutput = new TestTsFileOutput(); + TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true); + chunkWriter.writeToFileWriter(writer); + PublicBAOS publicBAOS = testTsFileOutput.publicBAOS; + ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); + // time chunk + assertEquals((byte) (0x80 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER), + ReadWriteIOUtils.readByte(buffer)); + assertEquals("s1.time", ReadWriteIOUtils.readVarIntString(buffer)); + assertEquals(164, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer)); + buffer.position(buffer.position() + 164); + + // value chunk 1 + assertEquals(0x40 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer)); + assertEquals("s1", ReadWriteIOUtils.readVarIntString(buffer)); + assertEquals(89, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + assertEquals(TSDataType.FLOAT.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer)); + buffer.position(buffer.position() + 89); + + // value chunk 2 + assertEquals(0x40 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer)); + assertEquals("s2", ReadWriteIOUtils.readVarIntString(buffer)); + assertEquals(29, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + assertEquals(TSDataType.INT32.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer)); + buffer.position(buffer.position() + 29); + + // value chunk 2 + assertEquals(0x40 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer)); + assertEquals("s3", ReadWriteIOUtils.readVarIntString(buffer)); + assertEquals(171, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + assertEquals(TSDataType.DOUBLE.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(171, buffer.remaining()); + } catch (IOException e) { + e.printStackTrace(); + fail(); + } + } + + @Test + public void testWrite2() { + VectorMeasurementSchemaStub measurementSchema = new VectorMeasurementSchemaStub(); + VectorChunkWriterImpl chunkWriter = new VectorChunkWriterImpl(measurementSchema); + + for (int time = 1; time <= 20; time++) { + chunkWriter.write(time, (float) time, false); + chunkWriter.write(time, time, false); + chunkWriter.write(time, (double) time, false); + chunkWriter.write(time); + } + chunkWriter.sealCurrentPage(); + for (int time = 21; time <= 40; time++) { + chunkWriter.write(time, (float) time, false); + chunkWriter.write(time, time, false); + chunkWriter.write(time, (double) time, false); + chunkWriter.write(time); + } + chunkWriter.sealCurrentPage(); + + // time chunk: 14 + (4 + 17 + 160) * 2 + // value chunk 1: 9 + (2 + 41 + 4 + 3 + 80) * 2 + // value chunk 2: 9 + (2 + 41 + 4 + 3 + 20) * 2 + // value chunk 3: 9 + (4 + 57 + 4 + 3 + 160) * 2 + assertEquals(1259L, chunkWriter.getCurrentChunkSize()); + + try { + TestTsFileOutput testTsFileOutput = new TestTsFileOutput(); + TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true); + chunkWriter.writeToFileWriter(writer); + PublicBAOS publicBAOS = testTsFileOutput.publicBAOS; + ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); + // time chunk + assertEquals((byte) (0x80 | MetaMarker.CHUNK_HEADER), + ReadWriteIOUtils.readByte(buffer)); + assertEquals("s1.time", ReadWriteIOUtils.readVarIntString(buffer)); + assertEquals(362, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer)); + buffer.position(buffer.position() + 362); + + // value chunk 1 + assertEquals(0x40 | MetaMarker.CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer)); + assertEquals("s1", ReadWriteIOUtils.readVarIntString(buffer)); + assertEquals(260, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + assertEquals(TSDataType.FLOAT.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer)); + buffer.position(buffer.position() + 260); + + // value chunk 2 + assertEquals(0x40 | MetaMarker.CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer)); + assertEquals("s2", ReadWriteIOUtils.readVarIntString(buffer)); + assertEquals(140, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + assertEquals(TSDataType.INT32.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer)); + buffer.position(buffer.position() + 140); + + // value chunk 2 + assertEquals(0x40 | MetaMarker.CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer)); + assertEquals("s3", ReadWriteIOUtils.readVarIntString(buffer)); + assertEquals(456, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + assertEquals(TSDataType.DOUBLE.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(456, buffer.remaining()); + + } catch (IOException e) { + e.printStackTrace(); + fail(); + } + } +} diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorMeasurementSchemaStub.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorMeasurementSchemaStub.java new file mode 100644 index 0000000..f823034 --- /dev/null +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorMeasurementSchemaStub.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.write.writer; + +import java.util.Arrays; +import java.util.List; +import org.apache.iotdb.tsfile.encoding.encoder.Encoder; +import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; + +public class VectorMeasurementSchemaStub implements IMeasurementSchema { + + @Override + public String getMeasurementId() { + return "s1.time"; + } + + @Override + public CompressionType getCompressor() { + return CompressionType.UNCOMPRESSED; + } + + @Override + public TSDataType getType() { + return TSDataType.Vector; + } + + @Override + public TSEncoding getTimeTSEncoding() { + return TSEncoding.PLAIN; + } + + @Override + public Encoder getTimeEncoder() { + return new PlainEncoder(TSDataType.INT64, 0); + } + + @Override + public List<String> getValueMeasurementIdList() { + return Arrays.asList("s1", "s2", "s3"); + } + + @Override + public List<TSDataType> getValueTSDataTypeList() { + return Arrays.asList(TSDataType.FLOAT, TSDataType.INT32, TSDataType.DOUBLE); + } + + @Override + public List<TSEncoding> getValueTSEncodingList() { + return Arrays.asList(TSEncoding.PLAIN, TSEncoding.PLAIN, TSEncoding.PLAIN); + } + + @Override + public List<Encoder> getValueEncoderList() { + return Arrays + .asList(new PlainEncoder(TSDataType.FLOAT, 0), new PlainEncoder(TSDataType.INT32, 0), + new PlainEncoder(TSDataType.DOUBLE, 0)); + } +}
