This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 4d934918 Fixed the issue that the time of the first data item written
to TSFile by measurement cannot be a negative number (#297)
4d934918 is described below
commit 4d9349184087898d27a6b883c1324065cb54c23c
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Nov 8 18:18:54 2024 +0800
Fixed the issue that the time of the first data item written to TSFile by
measurement cannot be a negative number (#297)
---
.../java/org/apache/tsfile/write/TsFileWriter.java | 2 +-
.../write/chunk/AlignedChunkGroupWriterImpl.java | 12 ++-
.../chunk/NonAlignedChunkGroupWriterImpl.java | 3 +-
.../apache/tsfile/write/TsFileWriteApiTest.java | 106 +++++++++++++++++++++
4 files changed, 118 insertions(+), 5 deletions(-)
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
index 063df34f..de9d798e 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
@@ -480,7 +480,7 @@ public class TsFileWriter implements AutoCloseable {
groupWriter = new AlignedChunkGroupWriterImpl(deviceId, encryptParam);
if (!isUnseq) { // Sequence File
((AlignedChunkGroupWriterImpl) groupWriter)
- .setLastTime(alignedDeviceLastTimeMap.getOrDefault(deviceId,
-1L));
+ .setLastTime(alignedDeviceLastTimeMap.get(deviceId));
}
} else {
groupWriter = new NonAlignedChunkGroupWriterImpl(deviceId,
encryptParam);
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
index cfc42043..1047114c 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
@@ -62,7 +62,8 @@ public class AlignedChunkGroupWriterImpl implements
IChunkGroupWriter {
private final EncryptParameter encryprParam;
- private long lastTime = -1;
+ private long lastTime = Long.MIN_VALUE;
+ private boolean isInitLastTime = false;
public AlignedChunkGroupWriterImpl(IDeviceID deviceId) {
this.deviceId = deviceId;
@@ -179,6 +180,7 @@ public class AlignedChunkGroupWriterImpl implements
IChunkGroupWriter {
}
timeChunkWriter.write(time);
lastTime = time;
+ isInitLastTime = true;
if (checkPageSizeAndMayOpenANewPage()) {
writePageToPageBuffer();
}
@@ -269,6 +271,7 @@ public class AlignedChunkGroupWriterImpl implements
IChunkGroupWriter {
}
timeChunkWriter.write(time);
lastTime = time;
+ isInitLastTime = true;
if (checkPageSizeAndMayOpenANewPage()) {
writePageToPageBuffer();
}
@@ -385,7 +388,7 @@ public class AlignedChunkGroupWriterImpl implements
IChunkGroupWriter {
}
private void checkIsHistoryData(long time) throws WriteProcessException {
- if (time <= lastTime) {
+ if (isInitLastTime && time <= lastTime) {
throw new WriteProcessException(
"Not allowed to write out-of-order data in timeseries "
+ deviceId
@@ -405,6 +408,9 @@ public class AlignedChunkGroupWriterImpl implements
IChunkGroupWriter {
}
public void setLastTime(Long lastTime) {
- this.lastTime = lastTime;
+ if (lastTime != null) {
+ this.lastTime = lastTime;
+ isInitLastTime = true;
+ }
}
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
index 197f165e..5d62c803 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
@@ -207,7 +207,8 @@ public class NonAlignedChunkGroupWriterImpl implements
IChunkGroupWriter {
}
private void checkIsHistoryData(String measurementId, long time) throws
WriteProcessException {
- if (time <= lastTimeMap.getOrDefault(measurementId, -1L)) {
+ final Long lastTime = lastTimeMap.get(measurementId);
+ if (lastTime != null && time <= lastTime) {
throw new WriteProcessException(
"Not allowed to write out-of-order data in timeseries "
+ deviceId
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
index 672ca2c1..e68a2f47 100644
--- a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
+++ b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
@@ -403,6 +403,59 @@ public class TsFileWriteApiTest {
}
}
+ @Test
+ public void writeNonAlignedWithTabletWithNegativeTimestamps() {
+ setEnv(100, 30);
+ try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
+ measurementSchemas.add(new MeasurementSchema("s1", TSDataType.TEXT,
TSEncoding.PLAIN));
+ measurementSchemas.add(new MeasurementSchema("s2", TSDataType.STRING,
TSEncoding.PLAIN));
+ measurementSchemas.add(new MeasurementSchema("s3", TSDataType.BLOB,
TSEncoding.PLAIN));
+ measurementSchemas.add(new MeasurementSchema("s4", TSDataType.DATE,
TSEncoding.PLAIN));
+
+ // register nonAligned timeseries
+ tsFileWriter.registerTimeseries(new Path(deviceId), measurementSchemas);
+
+ Tablet tablet = new Tablet(deviceId, measurementSchemas);
+ long[] timestamps = tablet.timestamps;
+ Object[] values = tablet.values;
+ tablet.initBitMaps();
+ int sensorNum = measurementSchemas.size();
+ long startTime = -100;
+ for (long r = 0; r < 10000; r++) {
+ int row = tablet.rowSize++;
+ timestamps[row] = startTime++;
+ for (int i = 0; i < sensorNum - 1; i++) {
+ if (i == 1 && r > 1000) {
+ tablet.bitMaps[i].mark((int) r % tablet.getMaxRowNumber());
+ continue;
+ }
+ Binary[] textSensor = (Binary[]) values[i];
+ textSensor[row] = new Binary("testString.........",
TSFileConfig.STRING_CHARSET);
+ }
+ if (r > 1000) {
+ tablet.bitMaps[sensorNum - 1].mark((int) r %
tablet.getMaxRowNumber());
+ } else {
+ LocalDate[] dateSensor = (LocalDate[]) values[sensorNum - 1];
+ dateSensor[row] = LocalDate.of(2024, 4, 1);
+ }
+ // write
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ tsFileWriter.write(tablet);
+ tablet.reset();
+ }
+ }
+ // write
+ if (tablet.rowSize != 0) {
+ tsFileWriter.write(tablet);
+ tablet.reset();
+ }
+
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail("Meet errors in test: " + e.getMessage());
+ }
+ }
+
@Test
public void writeAlignedWithTabletWithNullValue() {
setEnv(100, 30);
@@ -456,6 +509,59 @@ public class TsFileWriteApiTest {
}
}
+ @Test
+ public void writeDataToTabletsWithNegativeTimestamps() {
+ setEnv(100, 30);
+ try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
+ measurementSchemas.add(new MeasurementSchema("s1", TSDataType.TEXT,
TSEncoding.PLAIN));
+ measurementSchemas.add(new MeasurementSchema("s2", TSDataType.STRING,
TSEncoding.PLAIN));
+ measurementSchemas.add(new MeasurementSchema("s3", TSDataType.BLOB,
TSEncoding.PLAIN));
+ measurementSchemas.add(new MeasurementSchema("s4", TSDataType.DATE,
TSEncoding.PLAIN));
+
+ // register aligned timeseries
+ tsFileWriter.registerAlignedTimeseries(new Path(deviceId),
measurementSchemas);
+
+ Tablet tablet = new Tablet(deviceId, measurementSchemas);
+ long[] timestamps = tablet.timestamps;
+ Object[] values = tablet.values;
+ tablet.initBitMaps();
+ int sensorNum = measurementSchemas.size();
+ long startTime = -1000;
+ for (long r = 0; r < 10000; r++) {
+ int row = tablet.rowSize++;
+ timestamps[row] = startTime++;
+ for (int i = 0; i < sensorNum - 1; i++) {
+ if (i == 1 && r > 1000) {
+ tablet.bitMaps[i].mark((int) r % tablet.getMaxRowNumber());
+ continue;
+ }
+ Binary[] textSensor = (Binary[]) values[i];
+ textSensor[row] = new Binary("testString.........",
TSFileConfig.STRING_CHARSET);
+ }
+ if (r > 1000) {
+ tablet.bitMaps[sensorNum - 1].mark((int) r %
tablet.getMaxRowNumber());
+ } else {
+ LocalDate[] dateSensor = (LocalDate[]) values[sensorNum - 1];
+ dateSensor[row] = LocalDate.of(2024, 4, 1);
+ }
+ // write
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ tsFileWriter.writeAligned(tablet);
+ tablet.reset();
+ }
+ }
+ // write
+ if (tablet.rowSize != 0) {
+ tsFileWriter.writeAligned(tablet);
+ tablet.reset();
+ }
+
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail("Meet errors in test: " + e.getMessage());
+ }
+ }
+
/** Write an empty page and then write a nonEmpty page. */
@Test
public void writeAlignedTimeseriesWithEmptyPage() throws IOException,
WriteProcessException {