This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d7396bfc349 Fix datanode stuck when meets disk error  (#11172)
d7396bfc349 is described below

commit d7396bfc349876a72729dafc385776cd7556494d
Author: Haonan <[email protected]>
AuthorDate: Wed Sep 20 12:00:09 2023 +0800

    Fix datanode stuck when meets disk error  (#11172)
---
 .../java/org/apache/iotdb/db/service/DataNode.java | 15 ++++--
 .../iotdb/db/storageengine/StorageEngine.java      | 10 ++--
 .../db/storageengine/dataregion/DataRegion.java    |  4 ++
 .../dataregion/wal/recover/WALRecoverManager.java  | 15 ++++--
 ...eCrossSpaceCompactionWithFastPerformerTest.java |  4 +-
 ...sSpaceCompactionWithReadPointPerformerTest.java |  4 +-
 .../recover/SizeTieredCompactionRecoverTest.java   |  3 +-
 .../wal/recover/WALRecoverManagerTest.java         |  4 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |  9 ++--
 .../concurrent/ExceptionalCountDownLatch.java      | 54 ++++++++++++++++++++++
 10 files changed, 99 insertions(+), 23 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index e238bc4dc74..14bbe92a030 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -140,6 +140,9 @@ public class DataNode implements DataNodeMBean {
   private static final String REGISTER_INTERRUPTION =
       "Unexpected interruption when waiting to register to the cluster";
 
+  private boolean schemaRegionConsensusStarted = false;
+  private boolean dataRegionConsensusStarted = false;
+
   private DataNode() {
     // We do not init anything here, so that we can re-initialize the instance 
in IT.
   }
@@ -487,7 +490,9 @@ public class DataNode implements DataNodeMBean {
 
     try {
       SchemaRegionConsensusImpl.getInstance().start();
+      schemaRegionConsensusStarted = true;
       DataRegionConsensusImpl.getInstance().start();
+      dataRegionConsensusStarted = true;
     } catch (IOException e) {
       throw new StartupException(e);
     }
@@ -871,11 +876,15 @@ public class DataNode implements DataNodeMBean {
 
   public void stop() {
     deactivate();
-
+    SchemaEngine.getInstance().clear();
     try {
       MetricService.getInstance().stop();
-      SchemaRegionConsensusImpl.getInstance().stop();
-      DataRegionConsensusImpl.getInstance().stop();
+      if (schemaRegionConsensusStarted) {
+        SchemaRegionConsensusImpl.getInstance().stop();
+      }
+      if (dataRegionConsensusStarted) {
+        DataRegionConsensusImpl.getInstance().stop();
+      }
     } catch (Exception e) {
       logger.error("Stop data node error", e);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 1493de273d7..1ee4eb12335 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
+import org.apache.iotdb.commons.concurrent.ExceptionalCountDownLatch;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
@@ -28,6 +29,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.exception.ShutdownException;
+import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
@@ -83,7 +85,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -192,7 +193,7 @@ public class StorageEngine implements IService {
     isAllSgReady.set(allSgReady);
   }
 
-  public void recover() {
+  public void recover() throws StartupException {
     setAllSgReady(false);
     cachedThreadPool =
         
IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.STORAGE_ENGINE_CACHED_POOL.getName());
@@ -228,7 +229,7 @@ public class StorageEngine implements IService {
     readyDataRegionNum = new AtomicInteger(0);
     // init wal recover manager
     WALRecoverManager.getInstance()
-        .setAllDataRegionScannedLatch(new 
CountDownLatch(recoverDataRegionNum));
+        .setAllDataRegionScannedLatch(new 
ExceptionalCountDownLatch(recoverDataRegionNum));
     for (Map.Entry<String, List<DataRegionId>> entry : 
localDataRegionInfo.entrySet()) {
       String sgName = entry.getKey();
       for (DataRegionId dataRegionId : entry.getValue()) {
@@ -244,6 +245,7 @@ public class StorageEngine implements IService {
               } catch (DataRegionException e) {
                 logger.error(
                     "Failed to recover data region {}[{}]", sgName, 
dataRegionId.getId(), e);
+                return null;
               }
               dataRegionMap.put(dataRegionId, dataRegion);
               logger.info(
@@ -283,7 +285,7 @@ public class StorageEngine implements IService {
   }
 
   @Override
-  public void start() {
+  public void start() throws StartupException {
     // build time Interval to divide time partition
     initTimePartition();
     // create systemDir
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 6f33c63b42e..0794f469073 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -542,6 +542,10 @@ public class DataRegion implements IDataRegionForQuery {
         updatePartitionFileVersion(partitionNum, resource.getVersion());
       }
     } catch (IOException e) {
+      // signal wal recover manager to recover this region's files
+      WALRecoverManager.getInstance()
+          .getAllDataRegionScannedLatch()
+          .countDownWithException(e.getMessage());
       throw new DataRegionException(e);
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManager.java
index 44c80cad828..5ed0e19e0aa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManager.java
@@ -19,10 +19,12 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.wal.recover;
 
+import org.apache.iotdb.commons.concurrent.ExceptionalCountDownLatch;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.exception.DataRegionException;
@@ -55,7 +57,7 @@ public class WALRecoverManager {
   private volatile boolean hasStarted = false;
   // start recovery after all data regions have submitted unsealed zero-level 
TsFiles
   @SuppressWarnings("squid:S3077")
-  private volatile CountDownLatch allDataRegionScannedLatch;
+  private volatile ExceptionalCountDownLatch allDataRegionScannedLatch;
   // threads to recover wal nodes
   private ExecutorService recoverThreadPool;
   // stores all UnsealedTsFileRecoverPerformer submitted by data region 
processors
@@ -64,7 +66,7 @@ public class WALRecoverManager {
 
   private WALRecoverManager() {}
 
-  public void recover() throws WALRecoverException {
+  public void recover() throws WALRecoverException, StartupException {
     logger.info("Start recovering wal.");
     try {
       // collect wal nodes' information
@@ -85,6 +87,9 @@ public class WALRecoverManager {
       // which means walRecoverManger.addRecoverPerformer method won't be call 
anymore
       try {
         allDataRegionScannedLatch.await();
+        if (allDataRegionScannedLatch.hasException()) {
+          throw new 
DataRegionException(allDataRegionScannedLatch.getExceptionMessage());
+        }
         hasStarted = true;
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
@@ -110,6 +115,8 @@ public class WALRecoverManager {
       }
       // deal with remaining TsFiles which don't have wal
       asyncRecoverLeftTsFiles();
+    } catch (DataRegionException e) {
+      throw new StartupException(e.getMessage());
     } catch (Exception e) {
       for (UnsealedTsFileRecoverPerformer recoverPerformer :
           absolutePath2RecoverPerformer.values()) {
@@ -202,11 +209,11 @@ public class WALRecoverManager {
     return null;
   }
 
-  public CountDownLatch getAllDataRegionScannedLatch() {
+  public ExceptionalCountDownLatch getAllDataRegionScannedLatch() {
     return allDataRegionScannedLatch;
   }
 
-  public void setAllDataRegionScannedLatch(CountDownLatch 
allDataRegionScannedLatch) {
+  public void setAllDataRegionScannedLatch(ExceptionalCountDownLatch 
allDataRegionScannedLatch) {
     this.allDataRegionScannedLatch = allDataRegionScannedLatch;
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
index 800a23801bb..6cb1bf85250 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.storageengine.dataregion.compaction.cross;
 
+import org.apache.iotdb.commons.concurrent.ExceptionalCountDownLatch;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.AlignedPath;
@@ -61,7 +62,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.iotdb.commons.conf.IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX;
@@ -76,7 +76,7 @@ public class RewriteCrossSpaceCompactionWithFastPerformerTest 
extends AbstractCo
   public void setUp()
       throws IOException, WriteProcessException, MetadataException, 
InterruptedException {
     super.setUp();
-    WALRecoverManager.getInstance().setAllDataRegionScannedLatch(new 
CountDownLatch(1));
+    WALRecoverManager.getInstance().setAllDataRegionScannedLatch(new 
ExceptionalCountDownLatch(1));
     IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(1024);
     Thread.currentThread().setName("pool-1-IoTDB-Compaction-Worker-1");
   }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithReadPointPerformerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithReadPointPerformerTest.java
index ebfe580ef49..b066cdbc935 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithReadPointPerformerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithReadPointPerformerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.storageengine.dataregion.compaction.cross;
 
+import org.apache.iotdb.commons.concurrent.ExceptionalCountDownLatch;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.AlignedPath;
@@ -61,7 +62,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.iotdb.commons.conf.IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX;
@@ -76,7 +76,7 @@ public class 
RewriteCrossSpaceCompactionWithReadPointPerformerTest extends Abstr
   public void setUp()
       throws IOException, WriteProcessException, MetadataException, 
InterruptedException {
     super.setUp();
-    WALRecoverManager.getInstance().setAllDataRegionScannedLatch(new 
CountDownLatch(1));
+    WALRecoverManager.getInstance().setAllDataRegionScannedLatch(new 
ExceptionalCountDownLatch(1));
     IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(1024);
     Thread.currentThread().setName("pool-1-IoTDB-Compaction-Worker-1");
   }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/recover/SizeTieredCompactionRecoverTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/recover/SizeTieredCompactionRecoverTest.java
index b0ba4ae37db..3d23d55a511 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/recover/SizeTieredCompactionRecoverTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/recover/SizeTieredCompactionRecoverTest.java
@@ -20,6 +20,7 @@ package 
org.apache.iotdb.db.storageengine.dataregion.compaction.recover;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -175,7 +176,7 @@ public class SizeTieredCompactionRecoverTest {
 
   /** Test when a file that is not a directory exists under virtual 
storageGroup dir. */
   @Test
-  public void testRecoverWithUncorrectTimePartionDir() {
+  public void testRecoverWithUncorrectTimePartionDir() throws StartupException 
{
     StorageEngine.getInstance().start();
     try {
       File timePartitionDir = new File(SEQ_FILE_DIR);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManagerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManagerTest.java
index 8777464abf4..73cb2936ade 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManagerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManagerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.storageengine.dataregion.wal.recover;
 
+import org.apache.iotdb.commons.concurrent.ExceptionalCountDownLatch;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -72,7 +73,6 @@ import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -265,7 +265,7 @@ public class WALRecoverManagerTest {
     // prepare tsFiles
     List<WALRecoverListener> recoverListeners = prepareCrashedTsFile();
     // recover
-    recoverManager.setAllDataRegionScannedLatch(new CountDownLatch(0));
+    recoverManager.setAllDataRegionScannedLatch(new 
ExceptionalCountDownLatch(0));
     recoverManager.recover();
     // check recover listeners
     try {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 7506dc14337..9766269e420 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -269,13 +269,12 @@ public class EnvironmentUtils {
 
     createAllDir();
 
-    StorageEngine.getInstance().start();
-
-    SchemaEngine.getInstance().init();
+    try {
+      StorageEngine.getInstance().start();
 
-    CompactionTaskManager.getInstance().start();
+      SchemaEngine.getInstance().init();
 
-    try {
+      CompactionTaskManager.getInstance().start();
       WALManager.getInstance().start();
       FlushManager.getInstance().start();
     } catch (StartupException e) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ExceptionalCountDownLatch.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ExceptionalCountDownLatch.java
new file mode 100644
index 00000000000..a28665fb562
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ExceptionalCountDownLatch.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.concurrent;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+/** This class supports passing exception message when using CountDownLatch. */
+public class ExceptionalCountDownLatch {
+  private final CountDownLatch latch;
+  private final AtomicReference<String> exceptionMessage = new 
AtomicReference<>();
+
+  public ExceptionalCountDownLatch(int count) {
+    this.latch = new CountDownLatch(count);
+  }
+
+  public void countDown() {
+    latch.countDown();
+  }
+
+  public void countDownWithException(String message) {
+    exceptionMessage.set(message);
+    countDown();
+  }
+
+  public void await() throws InterruptedException {
+    latch.await();
+  }
+
+  public boolean hasException() {
+    return exceptionMessage.get() != null;
+  }
+
+  public String getExceptionMessage() {
+    return exceptionMessage.get();
+  }
+}

Reply via email to