This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch rc/1.1
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/rc/1.1 by this push:
     new dbff8287 Fixed the issue that the time of the first data item written 
to TSFile by measurement cannot be a negative number (#298)
dbff8287 is described below

commit dbff8287ffe5173f10bbc7f2513076922fc3a849
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Nov 8 18:59:31 2024 +0800

    Fixed the issue that the time of the first data item written to TSFile by 
measurement cannot be a negative number (#298)
---
 .../java/org/apache/tsfile/write/TsFileWriter.java |   2 +-
 .../write/chunk/AlignedChunkGroupWriterImpl.java   |  12 ++-
 .../chunk/NonAlignedChunkGroupWriterImpl.java      |   3 +-
 .../apache/tsfile/write/TsFileWriteApiTest.java    | 106 +++++++++++++++++++++
 4 files changed, 118 insertions(+), 5 deletions(-)

diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java 
b/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
index f7c2f293..d5ac5ded 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
@@ -469,7 +469,7 @@ public class TsFileWriter implements AutoCloseable {
         groupWriter = new AlignedChunkGroupWriterImpl(deviceId);
         if (!isUnseq) { // Sequence File
           ((AlignedChunkGroupWriterImpl) groupWriter)
-              .setLastTime(alignedDeviceLastTimeMap.getOrDefault(deviceId, 
-1L));
+              .setLastTime(alignedDeviceLastTimeMap.get(deviceId));
         }
       } else {
         groupWriter = new NonAlignedChunkGroupWriterImpl(deviceId);
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
index 7d2b6090..6172bc7d 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
@@ -58,7 +58,8 @@ public class AlignedChunkGroupWriterImpl implements 
IChunkGroupWriter {
 
   private final TimeChunkWriter timeChunkWriter;
 
-  private long lastTime = -1;
+  private long lastTime = Long.MIN_VALUE;
+  private boolean isInitLastTime = false;
 
   public AlignedChunkGroupWriterImpl(IDeviceID deviceId) {
     this.deviceId = deviceId;
@@ -150,6 +151,7 @@ public class AlignedChunkGroupWriterImpl implements 
IChunkGroupWriter {
     }
     timeChunkWriter.write(time);
     lastTime = time;
+    isInitLastTime = true;
     if (checkPageSizeAndMayOpenANewPage()) {
       writePageToPageBuffer();
     }
@@ -224,6 +226,7 @@ public class AlignedChunkGroupWriterImpl implements 
IChunkGroupWriter {
       }
       timeChunkWriter.write(time);
       lastTime = time;
+      isInitLastTime = true;
       if (checkPageSizeAndMayOpenANewPage()) {
         writePageToPageBuffer();
       }
@@ -340,7 +343,7 @@ public class AlignedChunkGroupWriterImpl implements 
IChunkGroupWriter {
   }
 
   private void checkIsHistoryData(long time) throws WriteProcessException {
-    if (time <= lastTime) {
+    if (isInitLastTime && time <= lastTime) {
       throw new WriteProcessException(
           "Not allowed to write out-of-order data in timeseries "
               + ((PlainDeviceID) deviceId).toStringID()
@@ -360,6 +363,9 @@ public class AlignedChunkGroupWriterImpl implements 
IChunkGroupWriter {
   }
 
   public void setLastTime(Long lastTime) {
-    this.lastTime = lastTime;
+    if (lastTime != null) {
+      this.lastTime = lastTime;
+      isInitLastTime = true;
+    }
   }
 }
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
index 2cb3c1e4..cb42233e 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
@@ -188,7 +188,8 @@ public class NonAlignedChunkGroupWriterImpl implements 
IChunkGroupWriter {
   }
 
   private void checkIsHistoryData(String measurementId, long time) throws 
WriteProcessException {
-    if (time <= lastTimeMap.getOrDefault(measurementId, -1L)) {
+    final Long lastTime = lastTimeMap.get(measurementId);
+    if (lastTime != null && time <= lastTime) {
       throw new WriteProcessException(
           "Not allowed to write out-of-order data in timeseries "
               + ((PlainDeviceID) deviceId).toStringID()
diff --git 
a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java 
b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
index 76f482c5..5586fb22 100644
--- a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
+++ b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
@@ -403,6 +403,59 @@ public class TsFileWriteApiTest {
     }
   }
 
+  @Test
+  public void writeNonAlignedWithTabletWithNegativeTimestamps() {
+    setEnv(100, 30);
+    try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
+      measurementSchemas.add(new MeasurementSchema("s1", TSDataType.TEXT, 
TSEncoding.PLAIN));
+      measurementSchemas.add(new MeasurementSchema("s2", TSDataType.STRING, 
TSEncoding.PLAIN));
+      measurementSchemas.add(new MeasurementSchema("s3", TSDataType.BLOB, 
TSEncoding.PLAIN));
+      measurementSchemas.add(new MeasurementSchema("s4", TSDataType.DATE, 
TSEncoding.PLAIN));
+
+      // register nonAligned timeseries
+      tsFileWriter.registerTimeseries(new Path(deviceId), measurementSchemas);
+
+      Tablet tablet = new Tablet(deviceId, measurementSchemas);
+      long[] timestamps = tablet.timestamps;
+      Object[] values = tablet.values;
+      tablet.initBitMaps();
+      int sensorNum = measurementSchemas.size();
+      long startTime = -100;
+      for (long r = 0; r < 10000; r++) {
+        int row = tablet.rowSize++;
+        timestamps[row] = startTime++;
+        for (int i = 0; i < sensorNum - 1; i++) {
+          if (i == 1 && r > 1000) {
+            tablet.bitMaps[i].mark((int) r % tablet.getMaxRowNumber());
+            continue;
+          }
+          Binary[] textSensor = (Binary[]) values[i];
+          textSensor[row] = new Binary("testString.........", 
TSFileConfig.STRING_CHARSET);
+        }
+        if (r > 1000) {
+          tablet.bitMaps[sensorNum - 1].mark((int) r % 
tablet.getMaxRowNumber());
+        } else {
+          LocalDate[] dateSensor = (LocalDate[]) values[sensorNum - 1];
+          dateSensor[row] = LocalDate.of(2024, 4, 1);
+        }
+        // write
+        if (tablet.rowSize == tablet.getMaxRowNumber()) {
+          tsFileWriter.write(tablet);
+          tablet.reset();
+        }
+      }
+      // write
+      if (tablet.rowSize != 0) {
+        tsFileWriter.write(tablet);
+        tablet.reset();
+      }
+
+    } catch (Throwable e) {
+      e.printStackTrace();
+      Assert.fail("Meet errors in test: " + e.getMessage());
+    }
+  }
+
   @Test
   public void writeAlignedWithTabletWithNullValue() {
     setEnv(100, 30);
@@ -456,6 +509,59 @@ public class TsFileWriteApiTest {
     }
   }
 
+  @Test
+  public void writeDataToTabletsWithNegativeTimestamps() {
+    setEnv(100, 30);
+    try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
+      measurementSchemas.add(new MeasurementSchema("s1", TSDataType.TEXT, 
TSEncoding.PLAIN));
+      measurementSchemas.add(new MeasurementSchema("s2", TSDataType.STRING, 
TSEncoding.PLAIN));
+      measurementSchemas.add(new MeasurementSchema("s3", TSDataType.BLOB, 
TSEncoding.PLAIN));
+      measurementSchemas.add(new MeasurementSchema("s4", TSDataType.DATE, 
TSEncoding.PLAIN));
+
+      // register aligned timeseries
+      tsFileWriter.registerAlignedTimeseries(new Path(deviceId), 
measurementSchemas);
+
+      Tablet tablet = new Tablet(deviceId, measurementSchemas);
+      long[] timestamps = tablet.timestamps;
+      Object[] values = tablet.values;
+      tablet.initBitMaps();
+      int sensorNum = measurementSchemas.size();
+      long startTime = -1000;
+      for (long r = 0; r < 10000; r++) {
+        int row = tablet.rowSize++;
+        timestamps[row] = startTime++;
+        for (int i = 0; i < sensorNum - 1; i++) {
+          if (i == 1 && r > 1000) {
+            tablet.bitMaps[i].mark((int) r % tablet.getMaxRowNumber());
+            continue;
+          }
+          Binary[] textSensor = (Binary[]) values[i];
+          textSensor[row] = new Binary("testString.........", 
TSFileConfig.STRING_CHARSET);
+        }
+        if (r > 1000) {
+          tablet.bitMaps[sensorNum - 1].mark((int) r % 
tablet.getMaxRowNumber());
+        } else {
+          LocalDate[] dateSensor = (LocalDate[]) values[sensorNum - 1];
+          dateSensor[row] = LocalDate.of(2024, 4, 1);
+        }
+        // write
+        if (tablet.rowSize == tablet.getMaxRowNumber()) {
+          tsFileWriter.writeAligned(tablet);
+          tablet.reset();
+        }
+      }
+      // write
+      if (tablet.rowSize != 0) {
+        tsFileWriter.writeAligned(tablet);
+        tablet.reset();
+      }
+
+    } catch (Throwable e) {
+      e.printStackTrace();
+      Assert.fail("Meet errors in test: " + e.getMessage());
+    }
+  }
+
   /** Write an empty page and then write a nonEmpty page. */
   @Test
   public void writeAlignedTimeseriesWithEmptyPage() throws IOException, 
WriteProcessException {

Reply via email to