This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new adb9bd2e48e Clone value array when insert non aligned tablet with null
(#14643) (#14679)
adb9bd2e48e is described below
commit adb9bd2e48e3a06ea276bba738f8321470d711fc
Author: Haonan <[email protected]>
AuthorDate: Mon Jan 13 11:33:37 2025 +0800
Clone value array when insert non aligned tablet with null (#14643) (#14679)
* Clone value array when insert non aligned tablet with null
* Fix UT
* Add recover wal IT
---
.../iotdb/it/env/cluster/env/AbstractEnv.java | 5 +++
.../iotdb/it/env/remote/env/RemoteServerEnv.java | 5 +++
.../java/org/apache/iotdb/itbase/env/BaseEnv.java | 3 ++
.../org/apache/iotdb/db/it/utils/TestUtils.java | 23 ++++++++++++++
.../iotdb/session/it/IoTDBSessionSimpleIT.java | 36 +++++++++++++++++++++-
.../iotdb/db/utils/datastructure/BinaryTVList.java | 6 +++-
.../db/utils/datastructure/BooleanTVList.java | 6 +++-
.../iotdb/db/utils/datastructure/DoubleTVList.java | 6 +++-
.../iotdb/db/utils/datastructure/FloatTVList.java | 6 +++-
.../iotdb/db/utils/datastructure/IntTVList.java | 6 +++-
.../iotdb/db/utils/datastructure/LongTVList.java | 6 +++-
.../storageengine/dataregion/DataRegionTest.java | 24 ++++++++++++---
12 files changed, 121 insertions(+), 11 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 0374091e40f..bfdbffda598 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -1030,6 +1030,11 @@ public abstract class AbstractEnv implements BaseEnv {
dataNodeWrapperList.forEach(AbstractNodeWrapper::stop);
}
+ @Override
+ public void shutdownForciblyAllDataNodes() {
+ dataNodeWrapperList.forEach(AbstractNodeWrapper::stopForcibly);
+ }
+
@Override
public void ensureNodeStatus(
final List<BaseNodeWrapper> nodes, final List<NodeStatus> targetStatus)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
index 907fc39605d..18297649b80 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
@@ -358,6 +358,11 @@ public class RemoteServerEnv implements BaseEnv {
throw new UnsupportedOperationException();
}
+ @Override
+ public void shutdownForciblyAllDataNodes() {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public int getMqttPort() {
throw new UnsupportedOperationException();
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 88f9f766783..1490fb1e7e8 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -248,6 +248,9 @@ public interface BaseEnv {
/** Shutdown all existed DataNodes. */
void shutdownAllDataNodes();
+ /** Shutdown forcibly all existed DataNodes. */
+ void shutdownForciblyAllDataNodes();
+
int getMqttPort();
String getIP();
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
index 372c9200c5e..fc21608882d 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
@@ -786,4 +786,27 @@ public class TestUtils {
fail();
}
}
+
+ public static void stopForciblyAndRestartDataNodes() {
+ EnvFactory.getEnv().shutdownForciblyAllDataNodes();
+ EnvFactory.getEnv().startAllDataNodes();
+ long waitStartMS = System.currentTimeMillis();
+ long maxWaitMS = 60_000L;
+ long retryIntervalMS = 1000;
+ while (true) {
+ try (Connection connection = EnvFactory.getEnv().getConnection()) {
+ break;
+ } catch (Exception e) {
+ try {
+ Thread.sleep(retryIntervalMS);
+ } catch (InterruptedException ex) {
+ break;
+ }
+ }
+ long waited = System.currentTimeMillis() - waitStartMS;
+ if (waited > maxWaitMS) {
+ fail("Timeout while waiting for datanodes restart");
+ }
+ }
+ }
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java
index 3de28346d18..09758f92800 100644
---
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.session.it;
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.db.protocol.thrift.OperationType;
import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.isession.SessionDataSet;
@@ -63,6 +64,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -478,7 +480,7 @@ public class IoTDBSessionSimpleIT {
@Test
@Category({LocalStandaloneIT.class, ClusterIT.class})
- public void insertTabletWithNullValuesTest() {
+ public void insertTabletWithNullValuesTest() throws InterruptedException {
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
List<MeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s0", TSDataType.DOUBLE,
TSEncoding.RLE));
@@ -526,6 +528,38 @@ public class IoTDBSessionSimpleIT {
assertEquals(9L, field.getLongV());
}
}
+ dataSet = session.executeQueryStatement("select s3 from root.sg1.d1");
+ int result = 0;
+ assertTrue(dataSet.hasNext());
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ Field field = rowRecord.getFields().get(0);
+ // skip null value
+ if (result == 3) {
+ result++;
+ }
+ assertEquals(result++, field.getIntV());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ TimeUnit.MILLISECONDS.sleep(2000);
+
+ TestUtils.stopForciblyAndRestartDataNodes();
+
+ try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+ SessionDataSet dataSet = session.executeQueryStatement("select s3 from
root.sg1.d1");
+ int result = 0;
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ Field field = rowRecord.getFields().get(0);
+ // skip null value
+ if (result == 3) {
+ result++;
+ }
+ assertEquals(result++, field.getIntV());
+ }
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
index bb8cd6e513c..d30e0a9357b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
@@ -210,11 +210,15 @@ public abstract class BinaryTVList extends TVList {
// constraint: time.length + timeIdxOffset == value.length
int timeIdxOffset = 0;
if (bitMap != null && !bitMap.isAllUnmarked()) {
- // time array is a reference, should clone necessary time values
+ // time array is a reference, should clone necessary time array
long[] clonedTime = new long[end - start];
System.arraycopy(time, start, clonedTime, 0, end - start);
time = clonedTime;
timeIdxOffset = start;
+ // value array is a reference, should clone necessary value array
+ Binary[] clonedValue = new Binary[value.length];
+ System.arraycopy(value, 0, clonedValue, 0, value.length);
+ value = clonedValue;
// drop null at the end of value array
int nullCnt =
dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start,
end, timeIdxOffset);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
index 00b0c3de8fd..7881da7aaa2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
@@ -169,11 +169,15 @@ public abstract class BooleanTVList extends TVList {
// constraint: time.length + timeIdxOffset == value.length
int timeIdxOffset = 0;
if (bitMap != null && !bitMap.isAllUnmarked()) {
- // time array is a reference, should clone necessary time values
+ // time array is a reference, should clone necessary time array
long[] clonedTime = new long[end - start];
System.arraycopy(time, start, clonedTime, 0, end - start);
time = clonedTime;
timeIdxOffset = start;
+ // value array is a reference, should clone necessary value array
+ boolean[] clonedValue = new boolean[value.length];
+ System.arraycopy(value, 0, clonedValue, 0, value.length);
+ value = clonedValue;
// drop null at the end of value array
int nullCnt =
dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start,
end, timeIdxOffset);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
index b7cc3336d10..0273aa21ee0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
@@ -174,11 +174,15 @@ public abstract class DoubleTVList extends TVList {
// constraint: time.length + timeIdxOffset == value.length
int timeIdxOffset = 0;
if (bitMap != null && !bitMap.isAllUnmarked()) {
- // time array is a reference, should clone necessary time values
+ // time array is a reference, should clone necessary time array
long[] clonedTime = new long[end - start];
System.arraycopy(time, start, clonedTime, 0, end - start);
time = clonedTime;
timeIdxOffset = start;
+ // value array is a reference, should clone necessary value array
+ double[] clonedValue = new double[value.length];
+ System.arraycopy(value, 0, clonedValue, 0, value.length);
+ value = clonedValue;
// drop null at the end of value array
int nullCnt =
dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start,
end, timeIdxOffset);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
index 43a208ecccf..3b08f6d0e39 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
@@ -174,11 +174,15 @@ public abstract class FloatTVList extends TVList {
// constraint: time.length + timeIdxOffset == value.length
int timeIdxOffset = 0;
if (bitMap != null && !bitMap.isAllUnmarked()) {
- // time array is a reference, should clone necessary time values
+ // time array is a reference, should clone necessary time array
long[] clonedTime = new long[end - start];
System.arraycopy(time, start, clonedTime, 0, end - start);
time = clonedTime;
timeIdxOffset = start;
+ // value array is a reference, should clone necessary value array
+ float[] clonedValue = new float[value.length];
+ System.arraycopy(value, 0, clonedValue, 0, value.length);
+ value = clonedValue;
// drop null at the end of value array
int nullCnt =
dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start,
end, timeIdxOffset);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
index 162f1fd1112..4fbf4ba96fd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
@@ -167,11 +167,15 @@ public abstract class IntTVList extends TVList {
// constraint: time.length + timeIdxOffset == value.length
int timeIdxOffset = 0;
if (bitMap != null && !bitMap.isAllUnmarked()) {
- // time array is a reference, should clone necessary time values
+ // time array is a reference, should clone necessary time array
long[] clonedTime = new long[end - start];
System.arraycopy(time, start, clonedTime, 0, end - start);
time = clonedTime;
timeIdxOffset = start;
+ // value array is a reference, should clone necessary value array
+ int[] clonedValue = new int[value.length];
+ System.arraycopy(value, 0, clonedValue, 0, value.length);
+ value = clonedValue;
// drop null at the end of value array
int nullCnt =
dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start,
end, timeIdxOffset);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
index 4c8f4ccea9e..8af3025468d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
@@ -167,11 +167,15 @@ public abstract class LongTVList extends TVList {
// constraint: time.length + timeIdxOffset == value.length
int timeIdxOffset = 0;
if (bitMap != null && !bitMap.isAllUnmarked()) {
- // time array is a reference, should clone necessary time values
+ // time array is a reference, should clone necessary time array
long[] clonedTime = new long[end - start];
System.arraycopy(time, start, clonedTime, 0, end - start);
time = clonedTime;
timeIdxOffset = start;
+ // value array is a reference, should clone necessary value array
+ long[] clonedValue = new long[value.length];
+ System.arraycopy(value, 0, clonedValue, 0, value.length);
+ value = clonedValue;
// drop null at the end of value array
int nullCnt =
dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start,
end, timeIdxOffset);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index e7fa54e0b65..6a24c7f0f9d 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -84,6 +84,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -263,8 +264,19 @@ public class DataRegionTest {
for (int r = 0; r < 100; r++) {
times[r] = r;
- ((int[]) columns[0])[r] = 1;
- ((long[]) columns[1])[r] = 1;
+ ((int[]) columns[0])[r] = r;
+ ((long[]) columns[1])[r] = r;
+ }
+
+ BitMap[] bitMaps = new BitMap[2];
+ bitMaps[0] = new BitMap(100);
+ bitMaps[1] = new BitMap(100);
+ for (int r = 0; r < 100; r++) {
+ if (r % 2 == 0) {
+ bitMaps[0].mark(r);
+ } else {
+ bitMaps[1].mark(r);
+ }
}
InsertTabletNode insertTabletNode1 =
@@ -276,11 +288,15 @@ public class DataRegionTest {
dataTypes,
measurementSchemas,
times,
- null,
+ bitMaps,
columns,
times.length);
-
+ int hashCode1 = Arrays.hashCode((int[]) columns[0]);
+ int hashCode2 = Arrays.hashCode((long[]) columns[1]);
dataRegion.insertTablet(insertTabletNode1);
+ // the hashCode should not be changed when insert
+ Assert.assertEquals(hashCode1, Arrays.hashCode((int[]) columns[0]));
+ Assert.assertEquals(hashCode2, Arrays.hashCode((long[]) columns[1]));
dataRegion.syncCloseAllWorkingTsFileProcessors();
for (int r = 50; r < 149; r++) {