This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rc/1.3.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.2 by this push:
new b479e05197c Fix Unseq data can't update the seq data bug (#12918)
b479e05197c is described below
commit b479e05197c5c28cc5858a6b5ca39d3edea6ee0d
Author: Jackie Tien <[email protected]>
AuthorDate: Fri Jul 12 23:43:23 2024 +0800
Fix Unseq data can't update the seq data bug (#12918)
(cherry picked from commit e8385b9f4879b32b0244c444a181db710696382c)
---
.../apache/iotdb/db/it/IoTDBDuplicateTimeIT.java | 88 ++++++++++++++++++++++
.../execution/operator/source/SeriesScanUtil.java | 2 +-
.../read/reader/common/PriorityMergeReader.java | 24 ++++--
3 files changed, 106 insertions(+), 8 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBDuplicateTimeIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBDuplicateTimeIT.java
new file mode 100644
index 00000000000..209657e5161
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBDuplicateTimeIT.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBDuplicateTimeIT {
+
+ @Before
+ public void setUp() throws Exception {
+
EnvFactory.getEnv().getConfig().getCommonConfig().setAvgSeriesPointNumberThreshold(2);
+ // Adjust memstable threshold size to make it flush automatically
+ EnvFactory.getEnv().initClusterEnvironment();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testDuplicateTime() throws SQLException {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ statement.execute("create timeseries root.db.d1.s1 with
datatype=INT32,encoding=PLAIN");
+ // version-1 tsfile
+ statement.execute("insert into root.db.d1(time,s1) values (2,2)");
+ statement.execute("insert into root.db.d1(time,s1) values (3,3)");
+
+ // version-2 unseq work memtable
+ statement.execute("insert into root.db.d1(time,s1) values (2,20)");
+
+ // version-3 tsfile
+ statement.execute("insert into root.db.d1(time,s1) values (5,5)");
+ statement.execute("insert into root.db.d1(time,s1) values (6,6)");
+
+ // version-2 unseq work memtable -> unseq tsfile
+ statement.execute("insert into root.db.d1(time,s1) values (5,50)");
+
+ try (ResultSet set = statement.executeQuery("SELECT s1 FROM root.db.d1
where time = 5")) {
+ int cnt = 0;
+ while (set.next()) {
+ assertEquals(5L, set.getLong(1));
+ assertEquals(50, set.getInt(2));
+
+ cnt++;
+ }
+ assertEquals(1, cnt);
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
index 52fba27f076..9a2adb79d55 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
@@ -1181,7 +1181,7 @@ public class SeriesScanUtil {
VersionPageReader(
QueryContext context, long version, long offset, IPageReader data,
boolean isSeq) {
this.context = context;
- this.version = new PriorityMergeReader.MergeReaderPriority(version,
offset);
+ this.version = new PriorityMergeReader.MergeReaderPriority(version,
offset, isSeq);
this.data = data;
this.isSeq = isSeq;
this.isAligned = data instanceof AlignedPageReader || data instanceof
MemAlignedPageReader;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
index 330c856ded0..e45d4c7d1e0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
@@ -54,7 +54,8 @@ public class PriorityMergeReader implements IPointReader {
public void addReader(IPointReader reader, long priority) throws IOException
{
if (reader.hasNextTimeValuePair()) {
heap.add(
- new Element(reader, reader.nextTimeValuePair(), new
MergeReaderPriority(priority, 0)));
+ new Element(
+ reader, reader.nextTimeValuePair(), new
MergeReaderPriority(priority, 0, false)));
} else {
reader.close();
}
@@ -162,20 +163,29 @@ public class PriorityMergeReader implements IPointReader {
}
public static class MergeReaderPriority implements
Comparable<MergeReaderPriority> {
- long version;
- long offset;
+ final long version;
+ final long offset;
- public MergeReaderPriority(long version, long offset) {
+ final boolean isSeq;
+
+ public MergeReaderPriority(long version, long offset, boolean isSeq) {
this.version = version;
this.offset = offset;
+ this.isSeq = isSeq;
}
@Override
public int compareTo(MergeReaderPriority o) {
- if (version < o.version) {
- return -1;
+ if (isSeq != o.isSeq) {
+ // one is seq and another is unseq, unseq always win
+ return isSeq ? -1 : 1;
+ } else {
+ // both seq or both unseq, using version + offset to compare
+ if (version < o.version) {
+ return -1;
+ }
+ return ((version > o.version) ? 1 : (Long.compare(offset, o.offset)));
}
- return ((version > o.version) ? 1 : (Long.compare(offset, o.offset)));
}
@Override