This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 011bdecb77c Fix WAL search index freezing (#17811)
011bdecb77c is described below
commit 011bdecb77c31e245af8baac613323e9f3b39c99
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 3 10:12:29 2026 +0800
Fix WAL search index freezing (#17811)
---
.../planner/plan/node/write/DeleteDataNode.java | 10 ++--
.../planner/plan/node/write/InsertRowNode.java | 6 ++-
.../planner/plan/node/write/InsertRowsNode.java | 6 ++-
.../planner/plan/node/write/InsertTabletNode.java | 13 ++++-
.../plan/planner/plan/node/write/ObjectNode.java | 10 ++--
.../plan/node/write/RelationalDeleteDataNode.java | 10 ++--
.../node/write/RelationalInsertTabletNode.java | 7 ++-
.../plan/planner/plan/node/write/SearchNode.java | 6 ++-
.../dataregion/wal/buffer/WALBuffer.java | 14 +-----
.../dataregion/wal/buffer/WALInfoEntry.java | 43 ++++++++++++++--
.../dataregion/wal/io/WALFileTest.java | 57 ++++++++++++++++++++++
11 files changed, 149 insertions(+), 33 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
index a9594f294bf..9193de12cf2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
@@ -122,7 +122,7 @@ public class DeleteDataNode extends AbstractDeleteDataNode {
DeleteDataNode deleteDataNode =
new DeleteDataNode(new PlanNodeId(""), pathList, deleteStartTime,
deleteEndTime);
- deleteDataNode.setSearchIndex(searchIndex);
+ deleteDataNode.setSearchIndexFromWAL(searchIndex);
return deleteDataNode;
}
@@ -143,7 +143,7 @@ public class DeleteDataNode extends AbstractDeleteDataNode {
DeleteDataNode deleteDataNode =
new DeleteDataNode(new PlanNodeId(""), pathList, deleteStartTime,
deleteEndTime);
- deleteDataNode.setSearchIndex(searchIndex);
+ deleteDataNode.setSearchIndexFromWAL(searchIndex);
return deleteDataNode;
}
@@ -233,8 +233,12 @@ public class DeleteDataNode extends AbstractDeleteDataNode
{
@Override
public void serializeToWAL(IWALByteBufferView buffer) {
+ serializeToWAL(buffer, getEncodedSearchIndex());
+ }
+
+ public void serializeToWAL(IWALByteBufferView buffer, long
encodedSearchIndex) {
buffer.putShort(PlanNodeType.DELETE_DATA.getNodeType());
- buffer.putLong(searchIndex);
+ buffer.putLong(encodedSearchIndex);
buffer.putInt(pathList.size());
for (PartialPath path : pathList) {
WALWriteUtils.write(path.getFullPath(), buffer);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
index 708dac1d33a..6edb9efed84 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
@@ -620,8 +620,12 @@ public class InsertRowNode extends InsertNode implements
WALEntryValue {
*/
@Override
public void serializeToWAL(IWALByteBufferView buffer) {
+ serializeToWAL(buffer, getEncodedSearchIndex());
+ }
+
+ public void serializeToWAL(IWALByteBufferView buffer, long
encodedSearchIndex) {
buffer.putShort(getType().getNodeType());
- buffer.putLong(getEncodedSearchIndex());
+ buffer.putLong(encodedSearchIndex);
subSerialize(buffer);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 957b22cfcb2..63fed732db0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -349,8 +349,12 @@ public class InsertRowsNode extends InsertNode implements
WALEntryValue {
*/
@Override
public void serializeToWAL(IWALByteBufferView buffer) {
+ serializeToWAL(buffer, getEncodedSearchIndex());
+ }
+
+ public void serializeToWAL(IWALByteBufferView buffer, long
encodedSearchIndex) {
buffer.putShort(getType().getNodeType());
- buffer.putLong(getEncodedSearchIndex());
+ buffer.putLong(encodedSearchIndex);
subSerialize(buffer);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index a00b418da9f..677befc5d1c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -900,12 +900,21 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
}
public void serializeToWAL(IWALByteBufferView buffer, List<int[]> rangeList)
{
+ serializeToWAL(buffer, rangeList, getEncodedSearchIndex());
+ }
+
+ public void serializeToWAL(
+ IWALByteBufferView buffer, List<int[]> rangeList, long
encodedSearchIndex) {
buffer.putShort(getType().getNodeType());
- subSerialize(buffer, rangeList);
+ subSerialize(buffer, rangeList, encodedSearchIndex);
}
void subSerialize(IWALByteBufferView buffer, List<int[]> rangeList) {
- buffer.putLong(getEncodedSearchIndex());
+ subSerialize(buffer, rangeList, getEncodedSearchIndex());
+ }
+
+ void subSerialize(IWALByteBufferView buffer, List<int[]> rangeList, long
encodedSearchIndex) {
+ buffer.putLong(encodedSearchIndex);
WALWriteUtils.write(targetPath.getFullPath(), buffer);
// data types are serialized in measurement schemas
writeMeasurementSchemas(buffer);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
index bdc03cc57c4..552d26e5631 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
@@ -109,8 +109,12 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
@Override
public void serializeToWAL(IWALByteBufferView buffer) {
+ serializeToWAL(buffer, getEncodedSearchIndex());
+ }
+
+ public void serializeToWAL(IWALByteBufferView buffer, long
encodedSearchIndex) {
buffer.putShort(getType().getNodeType());
- buffer.putLong(searchIndex);
+ buffer.putLong(encodedSearchIndex);
buffer.put((byte) (isEOF ? 1 : 0));
buffer.putLong(offset);
try {
@@ -138,7 +142,7 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
IObjectPath filePath =
IObjectPath.getDeserializer().deserializeFrom(stream);
int contentLength = stream.readInt();
ObjectNode objectNode = new ObjectNode(isEOF, offset, contentLength,
filePath);
- objectNode.setSearchIndex(searchIndex);
+ objectNode.setSearchIndexFromWAL(searchIndex);
return objectNode;
}
@@ -163,7 +167,7 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
}
ObjectNode objectNode = new ObjectNode(isEOF, offset, contents, filePath);
- objectNode.setSearchIndex(searchIndex);
+ objectNode.setSearchIndexFromWAL(searchIndex);
return objectNode;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java
index 22230b6dd3d..f970ab70d28 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java
@@ -130,7 +130,7 @@ public class RelationalDeleteDataNode extends
AbstractDeleteDataNode {
RelationalDeleteDataNode deleteDataNode =
new RelationalDeleteDataNode(new PlanNodeId(""), modEntries,
databaseName);
- deleteDataNode.setSearchIndex(searchIndex);
+ deleteDataNode.setSearchIndexFromWAL(searchIndex);
return deleteDataNode;
}
@@ -145,7 +145,7 @@ public class RelationalDeleteDataNode extends
AbstractDeleteDataNode {
RelationalDeleteDataNode deleteDataNode =
new RelationalDeleteDataNode(new PlanNodeId(""), modEntries,
databaseName);
- deleteDataNode.setSearchIndex(searchIndex);
+ deleteDataNode.setSearchIndexFromWAL(searchIndex);
return deleteDataNode;
}
@@ -225,8 +225,12 @@ public class RelationalDeleteDataNode extends
AbstractDeleteDataNode {
@Override
public void serializeToWAL(IWALByteBufferView buffer) {
+ serializeToWAL(buffer, getEncodedSearchIndex());
+ }
+
+ public void serializeToWAL(IWALByteBufferView buffer, long
encodedSearchIndex) {
buffer.putShort(PlanNodeType.RELATIONAL_DELETE_DATA.getNodeType());
- buffer.putLong(searchIndex);
+ buffer.putLong(encodedSearchIndex);
try {
ReadWriteForEncodingUtils.writeVarInt(modEntries.size(), buffer);
for (TableDeletionEntry modEntry : modEntries) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index 3302d8d7fc4..ec20e7969da 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -278,7 +278,12 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
@Override
void subSerialize(IWALByteBufferView buffer, List<int[]> rangeList) {
- super.subSerialize(buffer, rangeList);
+ subSerialize(buffer, rangeList, getEncodedSearchIndex());
+ }
+
+ @Override
+ void subSerialize(IWALByteBufferView buffer, List<int[]> rangeList, long
encodedSearchIndex) {
+ super.subSerialize(buffer, rangeList, encodedSearchIndex);
for (int i = 0; i < measurements.length; i++) {
if (measurements[i] != null) {
buffer.put(columnCategories[i].getCategory());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
index 975ac1f4844..48739dd27e7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
@@ -67,7 +67,11 @@ public abstract class SearchNode extends WritePlanNode
implements ComparableCons
return this;
}
- protected long getEncodedSearchIndex() {
+ public long getEncodedSearchIndex() {
+ return encodeSearchIndex(searchIndex, isLastFragment);
+ }
+
+ public static long encodeSearchIndex(long searchIndex, boolean
isLastFragment) {
if (!isLastFragment) {
return searchIndex;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
index 674a3d47819..0bf51cbdbd8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
@@ -26,10 +26,6 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.i18n.StorageEngineMessages;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ObjectNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
import org.apache.iotdb.db.service.metrics.WritingMetrics;
import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint;
import
org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.CheckpointManager;
@@ -338,15 +334,7 @@ public class WALBuffer extends AbstractWALBuffer {
// parse search index
long searchIndex = DEFAULT_SEARCH_INDEX;
if (walEntry.getType().needSearch()) {
- if (walEntry.getType() == WALEntryType.DELETE_DATA_NODE) {
- searchIndex = ((DeleteDataNode)
walEntry.getValue()).getSearchIndex();
- } else if (walEntry.getType() ==
WALEntryType.RELATIONAL_DELETE_DATA_NODE) {
- searchIndex = ((RelationalDeleteDataNode)
walEntry.getValue()).getSearchIndex();
- } else if (walEntry.getType() == WALEntryType.OBJECT_FILE_NODE) {
- searchIndex = ((ObjectNode) walEntry.getValue()).getSearchIndex();
- } else {
- searchIndex = ((InsertNode) walEntry.getValue()).getSearchIndex();
- }
+ searchIndex = ((WALInfoEntry) walEntry).getSearchIndex();
if (searchIndex != DEFAULT_SEARCH_INDEX) {
currentSearchIndex = searchIndex;
currentFileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java
index 09f471462f4..0fa12d84a01 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java
@@ -22,9 +22,14 @@ package
org.apache.iotdb.db.storageengine.dataregion.wal.buffer;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.i18n.StorageEngineMessages;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ObjectNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
@@ -44,9 +49,11 @@ public class WALInfoEntry extends WALEntry {
// extra info for InsertTablet type value
private TabletInfo tabletInfo;
+ private final Long encodedSearchIndex;
public WALInfoEntry(long memTableId, WALEntryValue value, boolean wait) {
super(memTableId, value, wait);
+ encodedSearchIndex = freezeEncodedSearchIndex(value);
}
public WALInfoEntry(long memTableId, WALEntryValue value) {
@@ -65,6 +72,7 @@ public class WALInfoEntry extends WALEntry {
WALInfoEntry(WALEntryType type, long memTableId, WALEntryValue value) {
super(type, memTableId, value, false);
+ encodedSearchIndex = freezeEncodedSearchIndex(value);
if (value instanceof InsertTabletNode) {
tabletInfo =
new TabletInfo(
@@ -83,15 +91,26 @@ public class WALInfoEntry extends WALEntry {
buffer.putLong(memTableId);
switch (type) {
case INSERT_TABLET_NODE:
- ((InsertTabletNode) value).serializeToWAL(buffer,
tabletInfo.tabletRangeList);
+ ((InsertTabletNode) value)
+ .serializeToWAL(buffer, tabletInfo.tabletRangeList,
encodedSearchIndex);
break;
case INSERT_ROW_NODE:
+ ((InsertRowNode) value).serializeToWAL(buffer, encodedSearchIndex);
+ break;
case INSERT_ROWS_NODE:
+ ((InsertRowsNode) value).serializeToWAL(buffer, encodedSearchIndex);
+ break;
case DELETE_DATA_NODE:
+ ((DeleteDataNode) value).serializeToWAL(buffer, encodedSearchIndex);
+ break;
case RELATIONAL_DELETE_DATA_NODE:
+ ((RelationalDeleteDataNode) value).serializeToWAL(buffer,
encodedSearchIndex);
+ break;
+ case OBJECT_FILE_NODE:
+ ((ObjectNode) value).serializeToWAL(buffer, encodedSearchIndex);
+ break;
case MEMORY_TABLE_SNAPSHOT:
case CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR_NODE:
- case OBJECT_FILE_NODE:
value.serializeToWAL(buffer);
break;
case MEMORY_TABLE_CHECKPOINT:
@@ -101,12 +120,25 @@ public class WALInfoEntry extends WALEntry {
}
}
+ public long getSearchIndex() {
+ return encodedSearchIndex == null
+ ? SearchNode.NO_CONSENSUS_INDEX
+ : SearchNode.extractSearchIndex(encodedSearchIndex);
+ }
+
+ private static Long freezeEncodedSearchIndex(WALEntryValue value) {
+ return value instanceof SearchNode ? ((SearchNode)
value).getEncodedSearchIndex() : null;
+ }
+
private static class TabletInfo {
// ranges of insert tablet
private final List<int[]> tabletRangeList;
public TabletInfo(List<int[]> tabletRangeList) {
- this.tabletRangeList = new ArrayList<>(tabletRangeList);
+ this.tabletRangeList = new ArrayList<>(tabletRangeList.size());
+ for (int[] range : tabletRangeList) {
+ this.tabletRangeList.add(Arrays.copyOf(range, range.length));
+ }
}
public int getRangeRowCount() {
@@ -178,7 +210,7 @@ public class WALInfoEntry extends WALEntry {
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), tabletInfo);
+ return Objects.hash(super.hashCode(), tabletInfo, encodedSearchIndex);
}
@Override
@@ -187,6 +219,7 @@ public class WALInfoEntry extends WALEntry {
return false;
}
WALInfoEntry other = (WALInfoEntry) obj;
- return Objects.equals(this.tabletInfo, other.tabletInfo);
+ return Objects.equals(this.tabletInfo, other.tabletInfo)
+ && Objects.equals(this.encodedSearchIndex, other.encodedSearchIndex);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileTest.java
index 8565b3ad3c5..b9e0f3cdd48 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileTest.java
@@ -45,6 +45,8 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -56,7 +58,9 @@ import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
public class WALFileTest {
@@ -129,6 +133,59 @@ public class WALFileTest {
assertEquals(expectedWALEntries, actualWALEntries);
}
+ @Test
+ public void testWALInfoEntryFreezesSearchIndexAtCreation()
+ throws IOException, IllegalPathException {
+ int fakeMemTableId = 1;
+
+ InsertRowNode insertRowNode = getInsertRowNode(devicePath);
+ insertRowNode.setSearchIndex(99L);
+ WALInfoEntry insertRowEntry = new WALInfoEntry(fakeMemTableId,
insertRowNode);
+ insertRowNode.setSearchIndex(100L);
+ insertRowNode.setLastFragment(true);
+
+ InsertRowNode actualInsertRowNode =
+ (InsertRowNode) serializeAndDeserialize(insertRowEntry).getValue();
+ assertEquals(99L, actualInsertRowNode.getSearchIndex());
+ assertFalse(actualInsertRowNode.isLastFragment());
+
+ InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath);
+ insertTabletNode.setSearchIndex(101L);
+ insertTabletNode.setLastFragment(false);
+ WALInfoEntry firstFragmentEntry =
+ new WALInfoEntry(
+ fakeMemTableId, insertTabletNode, Collections.singletonList(new
int[] {0, 2}));
+
+ insertTabletNode.setLastFragment(true);
+ WALInfoEntry lastFragmentEntry =
+ new WALInfoEntry(
+ fakeMemTableId, insertTabletNode, Collections.singletonList(new
int[] {2, 4}));
+
+ InsertTabletNode firstFragmentNode =
+ (InsertTabletNode)
serializeAndDeserialize(firstFragmentEntry).getValue();
+ InsertTabletNode lastFragmentNode =
+ (InsertTabletNode)
serializeAndDeserialize(lastFragmentEntry).getValue();
+
+ assertEquals(101L, firstFragmentNode.getSearchIndex());
+ assertFalse(firstFragmentNode.isLastFragment());
+ assertEquals(2, firstFragmentNode.getRowCount());
+ assertEquals(101L, lastFragmentNode.getSearchIndex());
+ assertTrue(lastFragmentNode.isLastFragment());
+ assertEquals(2, lastFragmentNode.getRowCount());
+ }
+
+ private static WALEntry serializeAndDeserialize(WALEntry walEntry) throws
IOException {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(walEntry.serializedSize());
+ WALByteBufferForTest buffer = new WALByteBufferForTest(byteBuffer);
+ walEntry.serialize(buffer);
+ byteBuffer.flip();
+ byte[] serializedEntry = new byte[byteBuffer.remaining()];
+ byteBuffer.get(serializedEntry);
+ try (DataInputStream stream = new DataInputStream(new
ByteArrayInputStream(serializedEntry))) {
+ return WALEntry.deserialize(stream);
+ }
+ }
+
@Test
public void testReadNotExistFile() throws IOException {
if (walFile.createNewFile()) {