This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 72c8c18cd5 [IOTDB-4616] Support serialization and deserialization for 
confignode standalone mode (#7670)
72c8c18cd5 is described below

commit 72c8c18cd5b18015491ec7d76cbb87743ae22303
Author: ljn55966005 <[email protected]>
AuthorDate: Thu Oct 27 18:42:12 2022 +0800

    [IOTDB-4616] Support serialization and deserialization for confignode 
standalone mode (#7670)
---
 .../statemachine/PartitionRegionStateMachine.java  |  73 +++++++++-
 .../iotdb/confignode/manager/ConsensusManager.java |   1 -
 .../confignode/writelog/io/BatchLogReader.java     |  76 ++++++++++
 .../iotdb/confignode/writelog/io/ILogReader.java   |  46 ++++++
 .../writelog/io/SingleFileLogReader.java           | 156 +++++++++++++++++++++
 5 files changed, 348 insertions(+), 4 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index 6eb55c8646..d4e89795cf 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -23,29 +23,43 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.auth.AuthException;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
 import 
org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeException;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor;
+import org.apache.iotdb.confignode.writelog.io.SingleFileLogReader;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.db.utils.writelog.LogWriter;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
 /** StateMachine for PartitionRegion */
 public class PartitionRegionStateMachine
     implements IStateMachine, IStateMachine.EventApi, 
IStateMachine.RetryPolicy {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionRegionStateMachine.class);
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
   private final ConfigPlanExecutor executor;
   private ConfigManager configManager;
+  private LogWriter logWriter;
+  private File logFile;
+  private int logFileId;
+  private static final String fileDir = CONF.getConsensusDir();
+  private static final String filePath = fileDir + File.separator + 
"log_inprogress_";
+  private static final long FILE_MAX_SIZE = 
CONF.getPartitionRegionStandAloneLogSegmentSizeMax();
   private final TEndPoint currentNodeTEndPoint;
 
   public PartitionRegionStateMachine(ConfigManager configManager, 
ConfigPlanExecutor executor) {
@@ -100,6 +114,7 @@ public class PartitionRegionStateMachine
       LOGGER.error(e.getMessage());
       result = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
     }
+
     return result;
   }
 
@@ -176,9 +191,7 @@ public class PartitionRegionStateMachine
   }
 
   @Override
-  public void start() {
-    // do nothing
-  }
+  public void start() {}
 
   @Override
   public void stop() {
@@ -207,4 +220,58 @@ public class PartitionRegionStateMachine
     // TODO implement this
     return RetryPolicy.super.getSleepTime();
   }
+
+  private void initStandAloneConfigNode() {
+    String[] list = new File(fileDir).list();
+    if (list != null && list.length != 0) {
+      for (String logFileName : list) {
+        File logFile = SystemFileFactory.INSTANCE.getFile(fileDir + 
File.separator + logFileName);
+        SingleFileLogReader logReader;
+        try {
+          logReader = new SingleFileLogReader(logFile);
+        } catch (FileNotFoundException e) {
+          LOGGER.error(
+              "initStandAloneConfigNode meets error, can't find standalone log 
files, filePath: {}",
+              logFile.getAbsolutePath(),
+              e);
+          continue;
+        }
+        while (logReader.hasNext()) {
+          // read and re-serialize the PhysicalPlan
+          ConfigPhysicalPlan nextPlan = logReader.next();
+          try {
+            executor.executeNonQueryPlan(nextPlan);
+          } catch (UnknownPhysicalPlanTypeException | AuthException e) {
+            LOGGER.error(e.getMessage());
+          }
+        }
+        logReader.close();
+      }
+    }
+    for (int ID = 0; ID < Integer.MAX_VALUE; ID++) {
+      File file = SystemFileFactory.INSTANCE.getFile(filePath);
+      if (!file.exists()) {
+        logFileId = ID;
+        break;
+      }
+    }
+    createLogFile(logFileId);
+  }
+
+  private void createLogFile(int logFileId) {
+    logFile = SystemFileFactory.INSTANCE.getFile(filePath + logFileId);
+    try {
+      if (logFile.createNewFile()) {
+        logWriter = new LogWriter(logFile, false);
+        LOGGER.info("Create StandaloneLog: {}", logFile.getAbsolutePath());
+      }
+    } catch (IOException e) {
+      LOGGER.warn("Can't create StandaloneLog: {}, retrying...", 
logFile.getAbsolutePath());
+      try {
+        TimeUnit.SECONDS.sleep(1);
+      } catch (InterruptedException ignored) {
+        // Ignore and retry
+      }
+    }
+  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index bd28a45680..b93b48e68a 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -96,7 +96,6 @@ public class ConsensusManager {
                               ConsensusFactory.CONSTRUCT_FAILED_MSG,
                               ConsensusFactory.StandAloneConsensus)));
     } else {
-
       // Implement local ConsensusLayer by ConfigNodeConfig
       consensusImpl =
           ConsensusFactory.getConsensusImpl(
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/BatchLogReader.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/BatchLogReader.java
new file mode 100644
index 0000000000..b9224be411
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/BatchLogReader.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.writelog.io;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class BatchLogReader implements ILogReader {
+  private static Logger logger = LoggerFactory.getLogger(BatchLogReader.class);
+
+  private Iterator<ConfigPhysicalPlan> planIterator;
+
+  private boolean fileCorrupted = false;
+
+  BatchLogReader(ByteBuffer buffer) {
+    List<ConfigPhysicalPlan> logs = readLogs(buffer);
+    this.planIterator = logs.iterator();
+  }
+
+  private List<ConfigPhysicalPlan> readLogs(ByteBuffer buffer) {
+    List<ConfigPhysicalPlan> plans = new ArrayList<>();
+    while (buffer.position() != buffer.limit()) {
+      try {
+        plans.add(ConfigPhysicalPlan.Factory.create(buffer));
+      } catch (IOException e) {
+        logger.error("Cannot deserialize PhysicalPlans from ByteBuffer, ignore 
remaining logs", e);
+        fileCorrupted = true;
+        break;
+      }
+    }
+    return plans;
+  }
+
+  @Override
+  public void close() {
+    // nothing to be closed
+  }
+
+  @Override
+  public boolean hasNext() {
+    return planIterator.hasNext();
+  }
+
+  @Override
+  public ConfigPhysicalPlan next() {
+    return planIterator.next();
+  }
+
+  public boolean isFileCorrupted() {
+    return fileCorrupted;
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/ILogReader.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/ILogReader.java
new file mode 100644
index 0000000000..3a780ad3ee
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/ILogReader.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.writelog.io;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+public interface ILogReader {
+
+  /** release resources occupied by this object, like file streams. */
+  void close();
+
+  /**
+   * return whether there exists next log to be read.
+   *
+   * @return whether there exists next log to be read.
+   * @throws IOException
+   */
+  boolean hasNext() throws FileNotFoundException;
+
+  /**
+   * return the next log read from media like a WAL file and covert it to a 
PhysicalPlan.
+   *
+   * @return the next log as a PhysicalPlan
+   * @throws java.util.NoSuchElementException when there are no more logs
+   */
+  ConfigPhysicalPlan next() throws FileNotFoundException;
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java
new file mode 100644
index 0000000000..f67cfdc286
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.writelog.io;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.NoSuchElementException;
+import java.util.zip.CRC32;
+
+public class SingleFileLogReader implements ILogReader {
+  private static final Logger logger = 
LoggerFactory.getLogger(SingleFileLogReader.class);
+  public static final int LEAST_LOG_SIZE = 12; // size + checksum
+
+  private DataInputStream logStream;
+  private String filepath;
+
+  private byte[] buffer;
+  private CRC32 checkSummer = new CRC32();
+
+  // used to indicate the position of the broken log
+  private int idx;
+  // used to truncate the broken logs
+  private long unbrokenLogsSize = 0;
+
+  private BatchLogReader batchLogReader;
+
+  private boolean fileCorrupted = false;
+
+  public SingleFileLogReader(File logFile) throws FileNotFoundException {
+    open(logFile);
+  }
+
+  @Override
+  public boolean hasNext() {
+    try {
+      if (batchLogReader != null && batchLogReader.hasNext()) {
+        return true;
+      }
+
+      if (logStream.available() < LEAST_LOG_SIZE) {
+        return false;
+      }
+
+      int logSize = logStream.readInt();
+      if (logSize <= 0) {
+        return false;
+      }
+      buffer = new byte[logSize];
+
+      int readLen = logStream.read(buffer, 0, logSize);
+      if (readLen < logSize) {
+        throw new IOException("Reach eof");
+      }
+
+      final long checkSum = logStream.readLong();
+      checkSummer.reset();
+      checkSummer.update(buffer, 0, logSize);
+      if (checkSummer.getValue() != checkSum) {
+        throw new IOException(
+            String.format(
+                "The check sum of the No.%d log batch is incorrect! In "
+                    + "file: "
+                    + "%d Calculated: %d.",
+                idx, checkSum, checkSummer.getValue()));
+      }
+
+      batchLogReader = new BatchLogReader(ByteBuffer.wrap(buffer));
+      if (!batchLogReader.isFileCorrupted()) {
+        unbrokenLogsSize = unbrokenLogsSize + logSize + LEAST_LOG_SIZE;
+      } else {
+        truncateBrokenLogs();
+      }
+      fileCorrupted = fileCorrupted || batchLogReader.isFileCorrupted();
+    } catch (Exception e) {
+      logger.error(
+          "Cannot read more PhysicalPlans from {}, successfully read index is 
{}. The reason is",
+          idx,
+          filepath,
+          e);
+      truncateBrokenLogs();
+      fileCorrupted = true;
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public ConfigPhysicalPlan next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+
+    idx++;
+    return batchLogReader.next();
+  }
+
+  @Override
+  public void close() {
+    if (logStream != null) {
+      try {
+        logStream.close();
+      } catch (IOException e) {
+        logger.error("Cannot close log file {}", filepath, e);
+      }
+    }
+  }
+
+  public void open(File logFile) throws FileNotFoundException {
+    close();
+    logStream = new DataInputStream(new BufferedInputStream(new 
FileInputStream(logFile)));
+    logger.info("open WAL file: {} size is {}", logFile.getName(), 
logFile.length());
+    this.filepath = logFile.getPath();
+    idx = 0;
+  }
+
+  public boolean isFileCorrupted() {
+    return fileCorrupted;
+  }
+
+  private void truncateBrokenLogs() {
+    try (FileOutputStream outputStream = new FileOutputStream(filepath, true);
+        FileChannel channel = outputStream.getChannel()) {
+      channel.truncate(unbrokenLogsSize);
+    } catch (IOException e) {
+      logger.error("Fail to truncate log file to size {}", unbrokenLogsSize, 
e);
+    }
+  }
+}

Reply via email to