This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d7396bfc349 Fix datanode stuck when meets disk error (#11172)
d7396bfc349 is described below
commit d7396bfc349876a72729dafc385776cd7556494d
Author: Haonan <[email protected]>
AuthorDate: Wed Sep 20 12:00:09 2023 +0800
Fix datanode stuck when meets disk error (#11172)
---
.../java/org/apache/iotdb/db/service/DataNode.java | 15 ++++--
.../iotdb/db/storageengine/StorageEngine.java | 10 ++--
.../db/storageengine/dataregion/DataRegion.java | 4 ++
.../dataregion/wal/recover/WALRecoverManager.java | 15 ++++--
...eCrossSpaceCompactionWithFastPerformerTest.java | 4 +-
...sSpaceCompactionWithReadPointPerformerTest.java | 4 +-
.../recover/SizeTieredCompactionRecoverTest.java | 3 +-
.../wal/recover/WALRecoverManagerTest.java | 4 +-
.../apache/iotdb/db/utils/EnvironmentUtils.java | 9 ++--
.../concurrent/ExceptionalCountDownLatch.java | 54 ++++++++++++++++++++++
10 files changed, 99 insertions(+), 23 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index e238bc4dc74..14bbe92a030 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -140,6 +140,9 @@ public class DataNode implements DataNodeMBean {
private static final String REGISTER_INTERRUPTION =
"Unexpected interruption when waiting to register to the cluster";
+ private boolean schemaRegionConsensusStarted = false;
+ private boolean dataRegionConsensusStarted = false;
+
private DataNode() {
// We do not init anything here, so that we can re-initialize the instance
in IT.
}
@@ -487,7 +490,9 @@ public class DataNode implements DataNodeMBean {
try {
SchemaRegionConsensusImpl.getInstance().start();
+ schemaRegionConsensusStarted = true;
DataRegionConsensusImpl.getInstance().start();
+ dataRegionConsensusStarted = true;
} catch (IOException e) {
throw new StartupException(e);
}
@@ -871,11 +876,15 @@ public class DataNode implements DataNodeMBean {
public void stop() {
deactivate();
-
+ SchemaEngine.getInstance().clear();
try {
MetricService.getInstance().stop();
- SchemaRegionConsensusImpl.getInstance().stop();
- DataRegionConsensusImpl.getInstance().stop();
+ if (schemaRegionConsensusStarted) {
+ SchemaRegionConsensusImpl.getInstance().stop();
+ }
+ if (dataRegionConsensusStarted) {
+ DataRegionConsensusImpl.getInstance().stop();
+ }
} catch (Exception e) {
logger.error("Stop data node error", e);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 1493de273d7..1ee4eb12335 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
+import org.apache.iotdb.commons.concurrent.ExceptionalCountDownLatch;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
@@ -28,6 +29,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.ShutdownException;
+import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
@@ -83,7 +85,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -192,7 +193,7 @@ public class StorageEngine implements IService {
isAllSgReady.set(allSgReady);
}
- public void recover() {
+ public void recover() throws StartupException {
setAllSgReady(false);
cachedThreadPool =
IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.STORAGE_ENGINE_CACHED_POOL.getName());
@@ -228,7 +229,7 @@ public class StorageEngine implements IService {
readyDataRegionNum = new AtomicInteger(0);
// init wal recover manager
WALRecoverManager.getInstance()
- .setAllDataRegionScannedLatch(new
CountDownLatch(recoverDataRegionNum));
+ .setAllDataRegionScannedLatch(new
ExceptionalCountDownLatch(recoverDataRegionNum));
for (Map.Entry<String, List<DataRegionId>> entry :
localDataRegionInfo.entrySet()) {
String sgName = entry.getKey();
for (DataRegionId dataRegionId : entry.getValue()) {
@@ -244,6 +245,7 @@ public class StorageEngine implements IService {
} catch (DataRegionException e) {
logger.error(
"Failed to recover data region {}[{}]", sgName,
dataRegionId.getId(), e);
+ return null;
}
dataRegionMap.put(dataRegionId, dataRegion);
logger.info(
@@ -283,7 +285,7 @@ public class StorageEngine implements IService {
}
@Override
- public void start() {
+ public void start() throws StartupException {
// build time Interval to divide time partition
initTimePartition();
// create systemDir
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 6f33c63b42e..0794f469073 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -542,6 +542,10 @@ public class DataRegion implements IDataRegionForQuery {
updatePartitionFileVersion(partitionNum, resource.getVersion());
}
} catch (IOException e) {
+ // signal wal recover manager to recover this region's files
+ WALRecoverManager.getInstance()
+ .getAllDataRegionScannedLatch()
+ .countDownWithException(e.getMessage());
throw new DataRegionException(e);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManager.java
index 44c80cad828..5ed0e19e0aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManager.java
@@ -19,10 +19,12 @@
package org.apache.iotdb.db.storageengine.dataregion.wal.recover;
+import org.apache.iotdb.commons.concurrent.ExceptionalCountDownLatch;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.exception.DataRegionException;
@@ -55,7 +57,7 @@ public class WALRecoverManager {
private volatile boolean hasStarted = false;
// start recovery after all data regions have submitted unsealed zero-level
TsFiles
@SuppressWarnings("squid:S3077")
- private volatile CountDownLatch allDataRegionScannedLatch;
+ private volatile ExceptionalCountDownLatch allDataRegionScannedLatch;
// threads to recover wal nodes
private ExecutorService recoverThreadPool;
// stores all UnsealedTsFileRecoverPerformer submitted by data region
processors
@@ -64,7 +66,7 @@ public class WALRecoverManager {
private WALRecoverManager() {}
- public void recover() throws WALRecoverException {
+ public void recover() throws WALRecoverException, StartupException {
logger.info("Start recovering wal.");
try {
// collect wal nodes' information
@@ -85,6 +87,9 @@ public class WALRecoverManager {
// which means walRecoverManger.addRecoverPerformer method won't be call
anymore
try {
allDataRegionScannedLatch.await();
+ if (allDataRegionScannedLatch.hasException()) {
+ throw new
DataRegionException(allDataRegionScannedLatch.getExceptionMessage());
+ }
hasStarted = true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -110,6 +115,8 @@ public class WALRecoverManager {
}
// deal with remaining TsFiles which don't have wal
asyncRecoverLeftTsFiles();
+ } catch (DataRegionException e) {
+ throw new StartupException(e.getMessage());
} catch (Exception e) {
for (UnsealedTsFileRecoverPerformer recoverPerformer :
absolutePath2RecoverPerformer.values()) {
@@ -202,11 +209,11 @@ public class WALRecoverManager {
return null;
}
- public CountDownLatch getAllDataRegionScannedLatch() {
+ public ExceptionalCountDownLatch getAllDataRegionScannedLatch() {
return allDataRegionScannedLatch;
}
- public void setAllDataRegionScannedLatch(CountDownLatch
allDataRegionScannedLatch) {
+ public void setAllDataRegionScannedLatch(ExceptionalCountDownLatch
allDataRegionScannedLatch) {
this.allDataRegionScannedLatch = allDataRegionScannedLatch;
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
index 800a23801bb..6cb1bf85250 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.compaction.cross;
+import org.apache.iotdb.commons.concurrent.ExceptionalCountDownLatch;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.AlignedPath;
@@ -61,7 +62,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX;
@@ -76,7 +76,7 @@ public class RewriteCrossSpaceCompactionWithFastPerformerTest
extends AbstractCo
public void setUp()
throws IOException, WriteProcessException, MetadataException,
InterruptedException {
super.setUp();
- WALRecoverManager.getInstance().setAllDataRegionScannedLatch(new
CountDownLatch(1));
+ WALRecoverManager.getInstance().setAllDataRegionScannedLatch(new
ExceptionalCountDownLatch(1));
IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(1024);
Thread.currentThread().setName("pool-1-IoTDB-Compaction-Worker-1");
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithReadPointPerformerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithReadPointPerformerTest.java
index ebfe580ef49..b066cdbc935 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithReadPointPerformerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithReadPointPerformerTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.compaction.cross;
+import org.apache.iotdb.commons.concurrent.ExceptionalCountDownLatch;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.AlignedPath;
@@ -61,7 +62,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX;
@@ -76,7 +76,7 @@ public class
RewriteCrossSpaceCompactionWithReadPointPerformerTest extends Abstr
public void setUp()
throws IOException, WriteProcessException, MetadataException,
InterruptedException {
super.setUp();
- WALRecoverManager.getInstance().setAllDataRegionScannedLatch(new
CountDownLatch(1));
+ WALRecoverManager.getInstance().setAllDataRegionScannedLatch(new
ExceptionalCountDownLatch(1));
IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(1024);
Thread.currentThread().setName("pool-1-IoTDB-Compaction-Worker-1");
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/recover/SizeTieredCompactionRecoverTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/recover/SizeTieredCompactionRecoverTest.java
index b0ba4ae37db..3d23d55a511 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/recover/SizeTieredCompactionRecoverTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/recover/SizeTieredCompactionRecoverTest.java
@@ -20,6 +20,7 @@ package
org.apache.iotdb.db.storageengine.dataregion.compaction.recover;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -175,7 +176,7 @@ public class SizeTieredCompactionRecoverTest {
/** Test when a file that is not a directory exists under virtual
storageGroup dir. */
@Test
- public void testRecoverWithUncorrectTimePartionDir() {
+ public void testRecoverWithUncorrectTimePartionDir() throws StartupException
{
StorageEngine.getInstance().start();
try {
File timePartitionDir = new File(SEQ_FILE_DIR);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManagerTest.java
index 8777464abf4..73cb2936ade 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManagerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManagerTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.wal.recover;
+import org.apache.iotdb.commons.concurrent.ExceptionalCountDownLatch;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -72,7 +73,6 @@ import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -265,7 +265,7 @@ public class WALRecoverManagerTest {
// prepare tsFiles
List<WALRecoverListener> recoverListeners = prepareCrashedTsFile();
// recover
- recoverManager.setAllDataRegionScannedLatch(new CountDownLatch(0));
+ recoverManager.setAllDataRegionScannedLatch(new
ExceptionalCountDownLatch(0));
recoverManager.recover();
// check recover listeners
try {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 7506dc14337..9766269e420 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -269,13 +269,12 @@ public class EnvironmentUtils {
createAllDir();
- StorageEngine.getInstance().start();
-
- SchemaEngine.getInstance().init();
+ try {
+ StorageEngine.getInstance().start();
- CompactionTaskManager.getInstance().start();
+ SchemaEngine.getInstance().init();
- try {
+ CompactionTaskManager.getInstance().start();
WALManager.getInstance().start();
FlushManager.getInstance().start();
} catch (StartupException e) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ExceptionalCountDownLatch.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ExceptionalCountDownLatch.java
new file mode 100644
index 00000000000..a28665fb562
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ExceptionalCountDownLatch.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.concurrent;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+/** This class supports passing exception message when using CountDownLatch. */
+public class ExceptionalCountDownLatch {
+ private final CountDownLatch latch;
+ private final AtomicReference<String> exceptionMessage = new
AtomicReference<>();
+
+ public ExceptionalCountDownLatch(int count) {
+ this.latch = new CountDownLatch(count);
+ }
+
+ public void countDown() {
+ latch.countDown();
+ }
+
+ public void countDownWithException(String message) {
+ exceptionMessage.set(message);
+ countDown();
+ }
+
+ public void await() throws InterruptedException {
+ latch.await();
+ }
+
+ public boolean hasException() {
+ return exceptionMessage.get() != null;
+ }
+
+ public String getExceptionMessage() {
+ return exceptionMessage.get();
+ }
+}