This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new bdebd1de25 [IOTDB-3684] Fail to read wal from wal file caused by
FileNotFoundException (#6494)
bdebd1de25 is described below
commit bdebd1de255a70f13b28d64c72a13ae8c2f4adb6
Author: Alan Choo <[email protected]>
AuthorDate: Wed Jun 29 09:26:54 2022 +0800
[IOTDB-3684] Fail to read wal from wal file caused by FileNotFoundException
(#6494)
---
.../iotdb/db/wal/buffer/AbstractWALBuffer.java | 14 ++--
.../org/apache/iotdb/db/wal/buffer/WALBuffer.java | 7 +-
.../java/org/apache/iotdb/db/wal/io/WALWriter.java | 13 +++
.../java/org/apache/iotdb/db/wal/node/WALNode.java | 92 ++++++++++++++++++++--
.../apache/iotdb/db/wal/utils/WALFileUtils.java | 22 +++---
.../iotdb/db/wal/node/ConsensusReqReaderTest.java | 28 +++----
.../org/apache/iotdb/db/wal/node/WALNodeTest.java | 6 +-
7 files changed, 139 insertions(+), 43 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
b/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
index 17df8b56e7..51d0ff33ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.wal.buffer;
import org.apache.iotdb.commons.file.SystemFileFactory;
-import org.apache.iotdb.db.wal.io.ILogWriter;
import org.apache.iotdb.db.wal.io.WALWriter;
import org.apache.iotdb.db.wal.utils.WALFileStatus;
import org.apache.iotdb.db.wal.utils.WALFileUtils;
@@ -43,10 +42,8 @@ public abstract class AbstractWALBuffer implements
IWALBuffer {
protected final AtomicLong currentWALFileVersion = new AtomicLong();
/** current search index */
protected volatile long currentSearchIndex;
- /** current search index */
- protected volatile WALFileStatus currentFileStatus;
/** current wal file log writer */
- protected volatile ILogWriter currentWALFileWriter;
+ protected volatile WALWriter currentWALFileWriter;
public AbstractWALBuffer(
String identifier, String logDirectory, long startFileVersion, long
startSearchIndex)
@@ -58,14 +55,15 @@ public abstract class AbstractWALBuffer implements
IWALBuffer {
logger.info("Create folder {} for wal node-{}'s buffer.", logDirectory,
identifier);
}
currentSearchIndex = startSearchIndex;
- currentFileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
currentWALFileVersion.set(startFileVersion);
currentWALFileWriter =
new WALWriter(
SystemFileFactory.INSTANCE.getFile(
logDirectory,
WALFileUtils.getLogFileName(
- currentWALFileVersion.get(), currentSearchIndex,
currentFileStatus)));
+ currentWALFileVersion.get(),
+ currentSearchIndex,
+ WALFileStatus.CONTAINS_SEARCH_INDEX)));
}
@Override
@@ -99,7 +97,9 @@ public abstract class AbstractWALBuffer implements IWALBuffer
{
SystemFileFactory.INSTANCE.getFile(
logDirectory,
WALFileUtils.getLogFileName(
- currentWALFileVersion.incrementAndGet(), searchIndex,
currentFileStatus));
+ currentWALFileVersion.incrementAndGet(),
+ searchIndex,
+ WALFileStatus.CONTAINS_SEARCH_INDEX));
currentWALFileWriter = new WALWriter(nextLogFile);
logger.debug("Open new wal file {} for wal node-{}'s buffer.",
nextLogFile, identifier);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
index 6626ce7f91..e14ba4943d 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
@@ -73,6 +73,8 @@ public class WALBuffer extends AbstractWALBuffer {
// buffer in syncing status, serializeThread makes sure no more writes to
syncingBuffer
private volatile ByteBuffer syncingBuffer;
// endregion
+ /** file status of working buffer, updating file writer's status when
syncing */
+ protected volatile WALFileStatus currentFileStatus;
/** single thread to serialize WALEntry to workingBuffer */
private final ExecutorService serializeThread;
/** single thread to sync syncingBuffer to disk */
@@ -86,6 +88,7 @@ public class WALBuffer extends AbstractWALBuffer {
String identifier, String logDirectory, long startFileVersion, long
startSearchIndex)
throws FileNotFoundException {
super(identifier, logDirectory, startFileVersion, startSearchIndex);
+ currentFileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
allocateBuffers();
serializeThread =
IoTDBThreadPoolFactory.newSingleThreadExecutor(
@@ -402,6 +405,8 @@ public class WALBuffer extends AbstractWALBuffer {
@Override
public void run() {
+ currentWALFileWriter.updateFileStatus(fileStatus);
+
// flush buffer to os
try {
currentWALFileWriter.write(syncingBuffer);
@@ -437,7 +442,7 @@ public class WALBuffer extends AbstractWALBuffer {
if (rollWALFileWriterListener != null
|| (forceFlag && currentWALFileWriter.size() >=
config.getWalFileSizeThresholdInByte())) {
try {
- rollLogWriter(searchIndex, fileStatus);
+ rollLogWriter(searchIndex, currentWALFileWriter.getWalFileStatus());
if (rollWALFileWriterListener != null) {
rollWALFileWriterListener.succeed();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java
b/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java
index 9c9e6174b3..5ce2bcb251 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java
@@ -19,13 +19,26 @@
package org.apache.iotdb.db.wal.io;
import org.apache.iotdb.db.wal.buffer.WALEntry;
+import org.apache.iotdb.db.wal.utils.WALFileStatus;
import java.io.File;
import java.io.FileNotFoundException;
/** WALWriter writes the binary {@link WALEntry} into .wal file. */
public class WALWriter extends LogWriter {
+ private WALFileStatus walFileStatus =
WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
+
public WALWriter(File logFile) throws FileNotFoundException {
super(logFile);
}
+
+ public void updateFileStatus(WALFileStatus walFileStatus) {
+ if (walFileStatus == WALFileStatus.CONTAINS_SEARCH_INDEX) {
+ this.walFileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
+ }
+ }
+
+ public WALFileStatus getWalFileStatus() {
+ return walFileStatus;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 5d85114446..6827a97de2 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -541,6 +541,15 @@ public class WALNode implements IWALNode {
break;
}
}
+ // cannot find any in this file
+ if (WALFileUtils.parseStatusCode(currentFiles[i].getName())
+ == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
+ if (!tmpNodes.isEmpty()) {
+ return mergeInsertNodes(tmpNodes);
+ } else {
+ continue;
+ }
+ }
try (WALReader walReader = new WALReader(currentFiles[i])) {
while (walReader.hasNext()) {
@@ -557,6 +566,10 @@ public class WALNode implements IWALNode {
return mergeInsertNodes(tmpNodes);
}
}
+ } catch (FileNotFoundException e) {
+ logger.debug(
+ "WAL file {} has been deleted, try to call getReq({}) again.",
currentFiles[i], index);
+ return getReq(index);
} catch (Exception e) {
logger.error("Fail to read wal from wal file {}", currentFiles[i], e);
}
@@ -588,6 +601,15 @@ public class WALNode implements IWALNode {
break;
}
}
+ // cannot find any in this file
+ if (WALFileUtils.parseStatusCode(currentFiles[i].getName())
+ == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
+ if (!tmpNodes.isEmpty()) {
+ result.add(mergeInsertNodes(tmpNodes));
+ } else {
+ continue;
+ }
+ }
try (WALReader walReader = new WALReader(currentFiles[i])) {
while (walReader.hasNext()) {
@@ -618,6 +640,13 @@ public class WALNode implements IWALNode {
tmpNodes = new ArrayList<>();
}
}
+ } catch (FileNotFoundException e) {
+ logger.debug(
+ "WAL file {} has been deleted, try to call getReqs({}, {}) again.",
+ currentFiles[i],
+ startIndex,
+ num);
+ return getReqs(startIndex, num);
} catch (Exception e) {
logger.error("Fail to read wal from wal file {}", currentFiles[i], e);
}
@@ -661,9 +690,11 @@ public class WALNode implements IWALNode {
return true;
}
+ // clear outdated iterator
insertNodes.clear();
itr = null;
+ // update files to search
if (needUpdatingFilesToSearch || filesToSearch == null) {
updateFilesToSearch();
if (needUpdatingFilesToSearch) {
@@ -671,6 +702,16 @@ public class WALNode implements IWALNode {
}
}
+ // find file contains search index
+ while
(WALFileUtils.parseStatusCode(filesToSearch[currentFileIndex].getName())
+ == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
+ currentFileIndex++;
+ if (currentFileIndex >= filesToSearch.length) {
+ needUpdatingFilesToSearch = true;
+ return false;
+ }
+ }
+
// find all insert plan of current wal file
List<InsertNode> tmpNodes = new ArrayList<>();
long targetIndex = nextSearchIndex;
@@ -697,6 +738,13 @@ public class WALNode implements IWALNode {
tmpNodes = new ArrayList<>();
}
}
+ } catch (FileNotFoundException e) {
+ logger.debug(
+ "WAL file {} has been deleted, try to find next {} again.",
+ identifier,
+ nextSearchIndex);
+ reset();
+ hasNext();
} catch (Exception e) {
logger.error("Fail to read wal from wal file {}",
filesToSearch[currentFileIndex], e);
}
@@ -707,6 +755,14 @@ public class WALNode implements IWALNode {
} else {
int fileIndex = currentFileIndex + 1;
while (!tmpNodes.isEmpty() && fileIndex < filesToSearch.length) {
+ // cannot find any in this file, find all slices of last insert plan
+ if (WALFileUtils.parseStatusCode(filesToSearch[fileIndex].getName())
+ == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
+ insertNodes.add(mergeInsertNodes(tmpNodes));
+ tmpNodes = Collections.emptyList();
+ break;
+ }
+
try (WALReader walReader = new WALReader(filesToSearch[fileIndex])) {
while (walReader.hasNext()) {
WALEntry walEntry = walReader.next();
@@ -726,6 +782,13 @@ public class WALNode implements IWALNode {
break;
}
}
+ } catch (FileNotFoundException e) {
+ logger.debug(
+ "WAL file {} has been deleted, try to find next {} again.",
+ identifier,
+ nextSearchIndex);
+ reset();
+ hasNext();
} catch (Exception e) {
logger.error("Fail to read wal from wal file {}",
filesToSearch[currentFileIndex], e);
}
@@ -764,14 +827,24 @@ public class WALNode implements IWALNode {
}
InsertNode insertNode = itr.next();
- if (insertNode.getSearchIndex() != nextSearchIndex) {
+ if (insertNode.getSearchIndex() == nextSearchIndex) {
+ nextSearchIndex++;
+ } else if (insertNode.getSearchIndex() > nextSearchIndex) {
logger.warn(
"Search index of wal node-{} are not continuously, skip from {} to
{}.",
identifier,
nextSearchIndex,
insertNode.getSearchIndex());
+ skipTo(insertNode.getSearchIndex() + 1);
+ } else {
+ logger.error(
+ "Search index of wal node-{} are out of order, {} is before {}.",
+ identifier,
+ nextSearchIndex,
+ insertNode.getSearchIndex());
+ throw new RuntimeException(
+ String.format("Search index of wal node-%s are out of order",
identifier));
}
- nextSearchIndex = insertNode.getSearchIndex() + 1;
return insertNode;
}
@@ -802,13 +875,18 @@ public class WALNode implements IWALNode {
nextSearchIndex,
targetIndex,
targetIndex);
- searchedFilesVersionId = -1;
- insertNodes.clear();
- itr = null;
}
+ reset();
nextSearchIndex = targetIndex;
- this.filesToSearch = null;
- this.currentFileIndex = -1;
+ }
+
+ /** Reset all params except nextSearchIndex */
+ private void reset() {
+ searchedFilesVersionId = -1;
+ insertNodes.clear();
+ itr = null;
+ filesToSearch = null;
+ currentFileIndex = -1;
needUpdatingFilesToSearch = true;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java
b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java
index 84874d09cb..e5d945024b 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java
@@ -36,11 +36,11 @@ public class WALFileUtils {
* versionId is a self-incremented id number, helping to maintain the order
of wal files.
* startSearchIndex is the valid search index of last flushed wal entry.
statusCode is the. For
* example: <br>
- * _0-0-0.wal: 1, 2, 3, -1, -1, 4, 5, -1 <br>
- * _1-5-1.wal: -1, -1, -1, -1 <br>
- * _2-5-0.wal: 6, 7, 8, 9, -1, -1, -1, 10, 11, -1, 12, 12 <br>
- * _3-12-0.wal: 12, 12, 12, 12, 12 <br>
- * _4-12-0.wal: 12, 13, 14, 15, 16, -1 <br>
+ * _0-0-1.wal: 1, 2, 3, -1, -1, 4, 5, -1 <br>
+ * _1-5-0.wal: -1, -1, -1, -1 <br>
+ * _2-5-1.wal: 6, 7, 8, 9, -1, -1, -1, 10, 11, -1, 12, 12 <br>
+ * _3-12-1.wal: 12, 12, 12, 12, 12 <br>
+ * _4-12-1.wal: 12, 13, 14, 15, 16, -1 <br>
*/
public static final Pattern WAL_FILE_NAME_PATTERN =
Pattern.compile(
@@ -105,13 +105,13 @@ public class WALFileUtils {
/**
* Find index of the file which probably contains target insert plan. <br>
- * Given wal files [ _0-0-0.wal, _1-5-1.wal, _2-5-0.wal, _3-12-0.wal,
_4-12-0.wal ], details as
+ * Given wal files [ _0-0-1.wal, _1-5-0.wal, _2-5-1.wal, _3-12-1.wal,
_4-12-1.wal ], details as
* below: <br>
- * _0-0-0.wal: 1, 2, 3, -1, -1, 4, 5, -1 <br>
- * _1-5-1.wal: -1, -1, -1, -1 <br>
- * _2-5-0.wal: 6, 7, 8, 9, -1, -1, -1, 10, 11, -1, 12, 12 <br>
- * _3-12-0.wal: 12, 12, 12, 12, 12 <br>
- * _4-12-0.wal: 12, 13, 14, 15, 16, -1 <br>
+ * _0-0-1.wal: 1, 2, 3, -1, -1, 4, 5, -1 <br>
+ * _1-5-0.wal: -1, -1, -1, -1 <br>
+ * _2-5-1.wal: 6, 7, 8, 9, -1, -1, -1, 10, 11, -1, 12, 12 <br>
+ * _3-12-1.wal: 12, 12, 12, 12, 12 <br>
+ * _4-12-1.wal: 12, 13, 14, 15, 16, -1 <br>
* searching [1, 5] will return 0, searching [6, 12] will return 1, search
[13, infinity) will
* return 3, others will return -1
*
diff --git
a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
index ccfefe8a48..e4603b7607 100644
---
a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
@@ -74,57 +74,57 @@ public class ConsensusReqReaderTest {
/**
* Generate wal files as below: <br>
- * _0-0-0.wal: 1,-1 <br>
- * _1-1-0.wal: 2,2,2 <br>
- * _2-2-0.wal: 3,3 <br>
- * _3-3-0.wal: 3,4 <br>
- * _4-4-0.wal: 4 <br>
- * _5-4-0.wal: 4,4,5 <br>
- * _6-5-0.wal: 6 <br>
+ * _0-0-1.wal: 1,-1 <br>
+ * _1-1-1.wal: 2,2,2 <br>
+ * _2-2-1.wal: 3,3 <br>
+ * _3-3-1.wal: 3,4 <br>
+ * _4-4-1.wal: 4 <br>
+ * _5-4-1.wal: 4,4,5 <br>
+ * _6-5-1.wal: 6 <br>
* 1 - InsertRowNode, 2 - InsertRowsOfOneDeviceNode, 3 - InsertRowsNode, 4 -
* InsertMultiTabletsNode, 5 - InsertTabletNode, 6 - InsertRowNode
*/
private void simulateFileScenario01() throws IllegalPathException {
InsertTabletNode insertTabletNode;
InsertRowNode insertRowNode;
- // _0-0-0.wal
+ // _0-0-1.wal
insertRowNode = getInsertRowNode(devicePath);
insertRowNode.setSearchIndex(1);
walNode.log(0, insertRowNode); // 1
insertTabletNode = getInsertTabletNode(devicePath, new long[] {2});
walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // -1
walNode.rollWALFile();
- // _1-1-0.wal
+ // _1-1-1.wal
insertRowNode = getInsertRowNode(devicePath);
insertRowNode.setSearchIndex(2);
walNode.log(0, insertRowNode); // 2
walNode.log(0, insertRowNode); // 2
walNode.log(0, insertRowNode); // 2
walNode.rollWALFile();
- // _2-2-0.wal
+ // _2-2-1.wal
insertRowNode = getInsertRowNode(devicePath);
insertRowNode.setSearchIndex(3);
walNode.log(0, insertRowNode); // 3
walNode.log(0, insertRowNode); // 3
walNode.rollWALFile();
- // _3-3-0.wal
+ // _3-3-1.wal
insertRowNode.setDevicePath(new PartialPath(devicePath + "test"));
walNode.log(0, insertRowNode); // 3
insertTabletNode = getInsertTabletNode(devicePath, new long[] {4});
insertTabletNode.setSearchIndex(4);
walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 4
walNode.rollWALFile();
- // _4-4-0.wal
+ // _4-4-1.wal
walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 4
walNode.rollWALFile();
- // _5-4-0.wal
+ // _5-4-1.wal
walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 4
walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 4
insertTabletNode = getInsertTabletNode(devicePath, new long[] {5});
insertTabletNode.setSearchIndex(5);
walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 5
walNode.rollWALFile();
- // _6-5-0.wal
+ // _6-5-1.wal
insertRowNode = getInsertRowNode(devicePath);
insertRowNode.setSearchIndex(6);
WALFlushListener walFlushListener = walNode.log(0, insertRowNode); // 6
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java
b/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java
index 62244f3fc9..25187c3039 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java
@@ -259,7 +259,7 @@ public class WALNodeTest {
}
walNode.onMemTableFlushed(memTable);
walNode.onMemTableCreated(new PrimitiveMemTable(), tsFilePath);
- // check existence of _0-0-0.wal file
+ // check existence of _0-0-0.wal file and _1-0-1.wal file
assertTrue(
new File(
logDirectory
@@ -270,7 +270,7 @@ public class WALNodeTest {
new File(
logDirectory
+ File.separator
- + WALFileUtils.getLogFileName(1, 0,
WALFileStatus.CONTAINS_NONE_SEARCH_INDEX))
+ + WALFileUtils.getLogFileName(1, 0,
WALFileStatus.CONTAINS_SEARCH_INDEX))
.exists());
walNode.deleteOutdatedFiles();
assertFalse(
@@ -283,7 +283,7 @@ public class WALNodeTest {
new File(
logDirectory
+ File.separator
- + WALFileUtils.getLogFileName(1, 0,
WALFileStatus.CONTAINS_NONE_SEARCH_INDEX))
+ + WALFileUtils.getLogFileName(1, 0,
WALFileStatus.CONTAINS_SEARCH_INDEX))
.exists());
// check flush listeners
try {