This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/1.3.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.2 by this push:
     new b479e05197c Fix Unseq data can't update the seq data bug (#12918)
b479e05197c is described below

commit b479e05197c5c28cc5858a6b5ca39d3edea6ee0d
Author: Jackie Tien <[email protected]>
AuthorDate: Fri Jul 12 23:43:23 2024 +0800

    Fix Unseq data can't update the seq data bug (#12918)
    
    (cherry picked from commit e8385b9f4879b32b0244c444a181db710696382c)
---
 .../apache/iotdb/db/it/IoTDBDuplicateTimeIT.java   | 88 ++++++++++++++++++++++
 .../execution/operator/source/SeriesScanUtil.java  |  2 +-
 .../read/reader/common/PriorityMergeReader.java    | 24 ++++--
 3 files changed, 106 insertions(+), 8 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBDuplicateTimeIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBDuplicateTimeIT.java
new file mode 100644
index 00000000000..209657e5161
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBDuplicateTimeIT.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBDuplicateTimeIT {
+
+  @Before
+  public void setUp() throws Exception {
+    
EnvFactory.getEnv().getConfig().getCommonConfig().setAvgSeriesPointNumberThreshold(2);
+    // Adjust memstable threshold size to make it flush automatically
+    EnvFactory.getEnv().initClusterEnvironment();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testDuplicateTime() throws SQLException {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      statement.execute("create timeseries root.db.d1.s1 with 
datatype=INT32,encoding=PLAIN");
+      // version-1 tsfile
+      statement.execute("insert into root.db.d1(time,s1) values (2,2)");
+      statement.execute("insert into root.db.d1(time,s1) values (3,3)");
+
+      // version-2 unseq work memtable
+      statement.execute("insert into root.db.d1(time,s1) values (2,20)");
+
+      // version-3 tsfile
+      statement.execute("insert into root.db.d1(time,s1) values (5,5)");
+      statement.execute("insert into root.db.d1(time,s1) values (6,6)");
+
+      // version-2 unseq work memtable -> unseq tsfile
+      statement.execute("insert into root.db.d1(time,s1) values (5,50)");
+
+      try (ResultSet set = statement.executeQuery("SELECT s1 FROM root.db.d1 
where time = 5")) {
+        int cnt = 0;
+        while (set.next()) {
+          assertEquals(5L, set.getLong(1));
+          assertEquals(50, set.getInt(2));
+
+          cnt++;
+        }
+        assertEquals(1, cnt);
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
index 52fba27f076..9a2adb79d55 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
@@ -1181,7 +1181,7 @@ public class SeriesScanUtil {
     VersionPageReader(
         QueryContext context, long version, long offset, IPageReader data, 
boolean isSeq) {
       this.context = context;
-      this.version = new PriorityMergeReader.MergeReaderPriority(version, 
offset);
+      this.version = new PriorityMergeReader.MergeReaderPriority(version, 
offset, isSeq);
       this.data = data;
       this.isSeq = isSeq;
       this.isAligned = data instanceof AlignedPageReader || data instanceof 
MemAlignedPageReader;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
index 330c856ded0..e45d4c7d1e0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
@@ -54,7 +54,8 @@ public class PriorityMergeReader implements IPointReader {
   public void addReader(IPointReader reader, long priority) throws IOException 
{
     if (reader.hasNextTimeValuePair()) {
       heap.add(
-          new Element(reader, reader.nextTimeValuePair(), new 
MergeReaderPriority(priority, 0)));
+          new Element(
+              reader, reader.nextTimeValuePair(), new 
MergeReaderPriority(priority, 0, false)));
     } else {
       reader.close();
     }
@@ -162,20 +163,29 @@ public class PriorityMergeReader implements IPointReader {
   }
 
   public static class MergeReaderPriority implements 
Comparable<MergeReaderPriority> {
-    long version;
-    long offset;
+    final long version;
+    final long offset;
 
-    public MergeReaderPriority(long version, long offset) {
+    final boolean isSeq;
+
+    public MergeReaderPriority(long version, long offset, boolean isSeq) {
       this.version = version;
       this.offset = offset;
+      this.isSeq = isSeq;
     }
 
     @Override
     public int compareTo(MergeReaderPriority o) {
-      if (version < o.version) {
-        return -1;
+      if (isSeq != o.isSeq) {
+        // one is seq and another is unseq, unseq always win
+        return isSeq ? -1 : 1;
+      } else {
+        // both seq or both unseq, using version + offset to compare
+        if (version < o.version) {
+          return -1;
+        }
+        return ((version > o.version) ? 1 : (Long.compare(offset, o.offset)));
       }
-      return ((version > o.version) ? 1 : (Long.compare(offset, o.offset)));
     }
 
     @Override

Reply via email to