This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch rc/1.1
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/rc/1.1 by this push:
new dbff8287 Fixed the issue that the time of the first data item written
to TSFile by measurement cannot be a negative number (#298)
dbff8287 is described below
commit dbff8287ffe5173f10bbc7f2513076922fc3a849
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Nov 8 18:59:31 2024 +0800
Fixed the issue that the time of the first data item written to TSFile by
measurement cannot be a negative number (#298)
---
.../java/org/apache/tsfile/write/TsFileWriter.java | 2 +-
.../write/chunk/AlignedChunkGroupWriterImpl.java | 12 ++-
.../chunk/NonAlignedChunkGroupWriterImpl.java | 3 +-
.../apache/tsfile/write/TsFileWriteApiTest.java | 106 +++++++++++++++++++++
4 files changed, 118 insertions(+), 5 deletions(-)
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
index f7c2f293..d5ac5ded 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
@@ -469,7 +469,7 @@ public class TsFileWriter implements AutoCloseable {
groupWriter = new AlignedChunkGroupWriterImpl(deviceId);
if (!isUnseq) { // Sequence File
((AlignedChunkGroupWriterImpl) groupWriter)
- .setLastTime(alignedDeviceLastTimeMap.getOrDefault(deviceId,
-1L));
+ .setLastTime(alignedDeviceLastTimeMap.get(deviceId));
}
} else {
groupWriter = new NonAlignedChunkGroupWriterImpl(deviceId);
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
index 7d2b6090..6172bc7d 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
@@ -58,7 +58,8 @@ public class AlignedChunkGroupWriterImpl implements
IChunkGroupWriter {
private final TimeChunkWriter timeChunkWriter;
- private long lastTime = -1;
+ private long lastTime = Long.MIN_VALUE;
+ private boolean isInitLastTime = false;
public AlignedChunkGroupWriterImpl(IDeviceID deviceId) {
this.deviceId = deviceId;
@@ -150,6 +151,7 @@ public class AlignedChunkGroupWriterImpl implements
IChunkGroupWriter {
}
timeChunkWriter.write(time);
lastTime = time;
+ isInitLastTime = true;
if (checkPageSizeAndMayOpenANewPage()) {
writePageToPageBuffer();
}
@@ -224,6 +226,7 @@ public class AlignedChunkGroupWriterImpl implements
IChunkGroupWriter {
}
timeChunkWriter.write(time);
lastTime = time;
+ isInitLastTime = true;
if (checkPageSizeAndMayOpenANewPage()) {
writePageToPageBuffer();
}
@@ -340,7 +343,7 @@ public class AlignedChunkGroupWriterImpl implements
IChunkGroupWriter {
}
private void checkIsHistoryData(long time) throws WriteProcessException {
- if (time <= lastTime) {
+ if (isInitLastTime && time <= lastTime) {
throw new WriteProcessException(
"Not allowed to write out-of-order data in timeseries "
+ ((PlainDeviceID) deviceId).toStringID()
@@ -360,6 +363,9 @@ public class AlignedChunkGroupWriterImpl implements
IChunkGroupWriter {
}
public void setLastTime(Long lastTime) {
- this.lastTime = lastTime;
+ if (lastTime != null) {
+ this.lastTime = lastTime;
+ isInitLastTime = true;
+ }
}
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
index 2cb3c1e4..cb42233e 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
@@ -188,7 +188,8 @@ public class NonAlignedChunkGroupWriterImpl implements
IChunkGroupWriter {
}
private void checkIsHistoryData(String measurementId, long time) throws
WriteProcessException {
- if (time <= lastTimeMap.getOrDefault(measurementId, -1L)) {
+ final Long lastTime = lastTimeMap.get(measurementId);
+ if (lastTime != null && time <= lastTime) {
throw new WriteProcessException(
"Not allowed to write out-of-order data in timeseries "
+ ((PlainDeviceID) deviceId).toStringID()
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
index 76f482c5..5586fb22 100644
--- a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
+++ b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
@@ -403,6 +403,59 @@ public class TsFileWriteApiTest {
}
}
+ @Test
+ public void writeNonAlignedWithTabletWithNegativeTimestamps() {
+ setEnv(100, 30);
+ try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
+ measurementSchemas.add(new MeasurementSchema("s1", TSDataType.TEXT,
TSEncoding.PLAIN));
+ measurementSchemas.add(new MeasurementSchema("s2", TSDataType.STRING,
TSEncoding.PLAIN));
+ measurementSchemas.add(new MeasurementSchema("s3", TSDataType.BLOB,
TSEncoding.PLAIN));
+ measurementSchemas.add(new MeasurementSchema("s4", TSDataType.DATE,
TSEncoding.PLAIN));
+
+ // register nonAligned timeseries
+ tsFileWriter.registerTimeseries(new Path(deviceId), measurementSchemas);
+
+ Tablet tablet = new Tablet(deviceId, measurementSchemas);
+ long[] timestamps = tablet.timestamps;
+ Object[] values = tablet.values;
+ tablet.initBitMaps();
+ int sensorNum = measurementSchemas.size();
+ long startTime = -100;
+ for (long r = 0; r < 10000; r++) {
+ int row = tablet.rowSize++;
+ timestamps[row] = startTime++;
+ for (int i = 0; i < sensorNum - 1; i++) {
+ if (i == 1 && r > 1000) {
+ tablet.bitMaps[i].mark((int) r % tablet.getMaxRowNumber());
+ continue;
+ }
+ Binary[] textSensor = (Binary[]) values[i];
+ textSensor[row] = new Binary("testString.........",
TSFileConfig.STRING_CHARSET);
+ }
+ if (r > 1000) {
+ tablet.bitMaps[sensorNum - 1].mark((int) r %
tablet.getMaxRowNumber());
+ } else {
+ LocalDate[] dateSensor = (LocalDate[]) values[sensorNum - 1];
+ dateSensor[row] = LocalDate.of(2024, 4, 1);
+ }
+ // write
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ tsFileWriter.write(tablet);
+ tablet.reset();
+ }
+ }
+ // write
+ if (tablet.rowSize != 0) {
+ tsFileWriter.write(tablet);
+ tablet.reset();
+ }
+
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail("Meet errors in test: " + e.getMessage());
+ }
+ }
+
@Test
public void writeAlignedWithTabletWithNullValue() {
setEnv(100, 30);
@@ -456,6 +509,59 @@ public class TsFileWriteApiTest {
}
}
+ @Test
+ public void writeDataToTabletsWithNegativeTimestamps() {
+ setEnv(100, 30);
+ try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
+ measurementSchemas.add(new MeasurementSchema("s1", TSDataType.TEXT,
TSEncoding.PLAIN));
+ measurementSchemas.add(new MeasurementSchema("s2", TSDataType.STRING,
TSEncoding.PLAIN));
+ measurementSchemas.add(new MeasurementSchema("s3", TSDataType.BLOB,
TSEncoding.PLAIN));
+ measurementSchemas.add(new MeasurementSchema("s4", TSDataType.DATE,
TSEncoding.PLAIN));
+
+ // register aligned timeseries
+ tsFileWriter.registerAlignedTimeseries(new Path(deviceId),
measurementSchemas);
+
+ Tablet tablet = new Tablet(deviceId, measurementSchemas);
+ long[] timestamps = tablet.timestamps;
+ Object[] values = tablet.values;
+ tablet.initBitMaps();
+ int sensorNum = measurementSchemas.size();
+ long startTime = -1000;
+ for (long r = 0; r < 10000; r++) {
+ int row = tablet.rowSize++;
+ timestamps[row] = startTime++;
+ for (int i = 0; i < sensorNum - 1; i++) {
+ if (i == 1 && r > 1000) {
+ tablet.bitMaps[i].mark((int) r % tablet.getMaxRowNumber());
+ continue;
+ }
+ Binary[] textSensor = (Binary[]) values[i];
+ textSensor[row] = new Binary("testString.........",
TSFileConfig.STRING_CHARSET);
+ }
+ if (r > 1000) {
+ tablet.bitMaps[sensorNum - 1].mark((int) r %
tablet.getMaxRowNumber());
+ } else {
+ LocalDate[] dateSensor = (LocalDate[]) values[sensorNum - 1];
+ dateSensor[row] = LocalDate.of(2024, 4, 1);
+ }
+ // write
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ tsFileWriter.writeAligned(tablet);
+ tablet.reset();
+ }
+ }
+ // write
+ if (tablet.rowSize != 0) {
+ tsFileWriter.writeAligned(tablet);
+ tablet.reset();
+ }
+
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail("Meet errors in test: " + e.getMessage());
+ }
+ }
+
/** Write an empty page and then write a nonEmpty page. */
@Test
public void writeAlignedTimeseriesWithEmptyPage() throws IOException,
WriteProcessException {