This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 72c8c18cd5 [IOTDB-4616] Support serialization and deserialization for
confignode standalone mode (#7670)
72c8c18cd5 is described below
commit 72c8c18cd5b18015491ec7d76cbb87743ae22303
Author: ljn55966005 <[email protected]>
AuthorDate: Thu Oct 27 18:42:12 2022 +0800
[IOTDB-4616] Support serialization and deserialization for confignode
standalone mode (#7670)
---
.../statemachine/PartitionRegionStateMachine.java | 73 +++++++++-
.../iotdb/confignode/manager/ConsensusManager.java | 1 -
.../confignode/writelog/io/BatchLogReader.java | 76 ++++++++++
.../iotdb/confignode/writelog/io/ILogReader.java | 46 ++++++
.../writelog/io/SingleFileLogReader.java | 156 +++++++++++++++++++++
5 files changed, 348 insertions(+), 4 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index 6eb55c8646..d4e89795cf 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -23,29 +23,43 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import
org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeException;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor;
+import org.apache.iotdb.confignode.writelog.io.SingleFileLogReader;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.db.utils.writelog.LogWriter;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
/** StateMachine for PartitionRegion */
public class PartitionRegionStateMachine
implements IStateMachine, IStateMachine.EventApi,
IStateMachine.RetryPolicy {
private static final Logger LOGGER =
LoggerFactory.getLogger(PartitionRegionStateMachine.class);
+ private static final ConfigNodeConfig CONF =
ConfigNodeDescriptor.getInstance().getConf();
private final ConfigPlanExecutor executor;
private ConfigManager configManager;
+ private LogWriter logWriter;
+ private File logFile;
+ private int logFileId;
+ private static final String fileDir = CONF.getConsensusDir();
+ private static final String filePath = fileDir + File.separator +
"log_inprogress_";
+ private static final long FILE_MAX_SIZE =
CONF.getPartitionRegionStandAloneLogSegmentSizeMax();
private final TEndPoint currentNodeTEndPoint;
public PartitionRegionStateMachine(ConfigManager configManager,
ConfigPlanExecutor executor) {
@@ -100,6 +114,7 @@ public class PartitionRegionStateMachine
LOGGER.error(e.getMessage());
result = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
}
+
return result;
}
@@ -176,9 +191,7 @@ public class PartitionRegionStateMachine
}
@Override
- public void start() {
- // do nothing
- }
+ public void start() {}
@Override
public void stop() {
@@ -207,4 +220,58 @@ public class PartitionRegionStateMachine
// TODO implement this
return RetryPolicy.super.getSleepTime();
}
+
+ private void initStandAloneConfigNode() {
+ String[] list = new File(fileDir).list();
+ if (list != null && list.length != 0) {
+ for (String logFileName : list) {
+ File logFile = SystemFileFactory.INSTANCE.getFile(fileDir +
File.separator + logFileName);
+ SingleFileLogReader logReader;
+ try {
+ logReader = new SingleFileLogReader(logFile);
+ } catch (FileNotFoundException e) {
+ LOGGER.error(
+ "initStandAloneConfigNode meets error, can't find standalone log
files, filePath: {}",
+ logFile.getAbsolutePath(),
+ e);
+ continue;
+ }
+ while (logReader.hasNext()) {
+ // read and re-serialize the PhysicalPlan
+ ConfigPhysicalPlan nextPlan = logReader.next();
+ try {
+ executor.executeNonQueryPlan(nextPlan);
+ } catch (UnknownPhysicalPlanTypeException | AuthException e) {
+ LOGGER.error(e.getMessage());
+ }
+ }
+ logReader.close();
+ }
+ }
+ for (int ID = 0; ID < Integer.MAX_VALUE; ID++) {
+ File file = SystemFileFactory.INSTANCE.getFile(filePath);
+ if (!file.exists()) {
+ logFileId = ID;
+ break;
+ }
+ }
+ createLogFile(logFileId);
+ }
+
+ private void createLogFile(int logFileId) {
+ logFile = SystemFileFactory.INSTANCE.getFile(filePath + logFileId);
+ try {
+ if (logFile.createNewFile()) {
+ logWriter = new LogWriter(logFile, false);
+ LOGGER.info("Create StandaloneLog: {}", logFile.getAbsolutePath());
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Can't create StandaloneLog: {}, retrying...",
logFile.getAbsolutePath());
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException ignored) {
+ // Ignore and retry
+ }
+ }
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index bd28a45680..b93b48e68a 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -96,7 +96,6 @@ public class ConsensusManager {
ConsensusFactory.CONSTRUCT_FAILED_MSG,
ConsensusFactory.StandAloneConsensus)));
} else {
-
// Implement local ConsensusLayer by ConfigNodeConfig
consensusImpl =
ConsensusFactory.getConsensusImpl(
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/BatchLogReader.java
b/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/BatchLogReader.java
new file mode 100644
index 0000000000..b9224be411
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/BatchLogReader.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.writelog.io;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class BatchLogReader implements ILogReader {
+ private static Logger logger = LoggerFactory.getLogger(BatchLogReader.class);
+
+ private Iterator<ConfigPhysicalPlan> planIterator;
+
+ private boolean fileCorrupted = false;
+
+ BatchLogReader(ByteBuffer buffer) {
+ List<ConfigPhysicalPlan> logs = readLogs(buffer);
+ this.planIterator = logs.iterator();
+ }
+
+ private List<ConfigPhysicalPlan> readLogs(ByteBuffer buffer) {
+ List<ConfigPhysicalPlan> plans = new ArrayList<>();
+ while (buffer.position() != buffer.limit()) {
+ try {
+ plans.add(ConfigPhysicalPlan.Factory.create(buffer));
+ } catch (IOException e) {
+ logger.error("Cannot deserialize PhysicalPlans from ByteBuffer, ignore
remaining logs", e);
+ fileCorrupted = true;
+ break;
+ }
+ }
+ return plans;
+ }
+
+ @Override
+ public void close() {
+ // nothing to be closed
+ }
+
+ @Override
+ public boolean hasNext() {
+ return planIterator.hasNext();
+ }
+
+ @Override
+ public ConfigPhysicalPlan next() {
+ return planIterator.next();
+ }
+
+ public boolean isFileCorrupted() {
+ return fileCorrupted;
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/ILogReader.java
b/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/ILogReader.java
new file mode 100644
index 0000000000..3a780ad3ee
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/ILogReader.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.writelog.io;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+public interface ILogReader {
+
+ /** release resources occupied by this object, like file streams. */
+ void close();
+
+ /**
+ * return whether there exists next log to be read.
+ *
+ * @return whether there exists next log to be read.
+ * @throws IOException
+ */
+ boolean hasNext() throws FileNotFoundException;
+
+ /**
+ * return the next log read from media like a WAL file and covert it to a
PhysicalPlan.
+ *
+ * @return the next log as a PhysicalPlan
+ * @throws java.util.NoSuchElementException when there are no more logs
+ */
+ ConfigPhysicalPlan next() throws FileNotFoundException;
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java
b/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java
new file mode 100644
index 0000000000..f67cfdc286
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.writelog.io;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.NoSuchElementException;
+import java.util.zip.CRC32;
+
+public class SingleFileLogReader implements ILogReader {
+ private static final Logger logger =
LoggerFactory.getLogger(SingleFileLogReader.class);
+ public static final int LEAST_LOG_SIZE = 12; // size + checksum
+
+ private DataInputStream logStream;
+ private String filepath;
+
+ private byte[] buffer;
+ private CRC32 checkSummer = new CRC32();
+
+ // used to indicate the position of the broken log
+ private int idx;
+ // used to truncate the broken logs
+ private long unbrokenLogsSize = 0;
+
+ private BatchLogReader batchLogReader;
+
+ private boolean fileCorrupted = false;
+
+ public SingleFileLogReader(File logFile) throws FileNotFoundException {
+ open(logFile);
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ if (batchLogReader != null && batchLogReader.hasNext()) {
+ return true;
+ }
+
+ if (logStream.available() < LEAST_LOG_SIZE) {
+ return false;
+ }
+
+ int logSize = logStream.readInt();
+ if (logSize <= 0) {
+ return false;
+ }
+ buffer = new byte[logSize];
+
+ int readLen = logStream.read(buffer, 0, logSize);
+ if (readLen < logSize) {
+ throw new IOException("Reach eof");
+ }
+
+ final long checkSum = logStream.readLong();
+ checkSummer.reset();
+ checkSummer.update(buffer, 0, logSize);
+ if (checkSummer.getValue() != checkSum) {
+ throw new IOException(
+ String.format(
+ "The check sum of the No.%d log batch is incorrect! In "
+ + "file: "
+ + "%d Calculated: %d.",
+ idx, checkSum, checkSummer.getValue()));
+ }
+
+ batchLogReader = new BatchLogReader(ByteBuffer.wrap(buffer));
+ if (!batchLogReader.isFileCorrupted()) {
+ unbrokenLogsSize = unbrokenLogsSize + logSize + LEAST_LOG_SIZE;
+ } else {
+ truncateBrokenLogs();
+ }
+ fileCorrupted = fileCorrupted || batchLogReader.isFileCorrupted();
+ } catch (Exception e) {
+ logger.error(
+ "Cannot read more PhysicalPlans from {}, successfully read index is
{}. The reason is",
+ idx,
+ filepath,
+ e);
+ truncateBrokenLogs();
+ fileCorrupted = true;
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public ConfigPhysicalPlan next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ idx++;
+ return batchLogReader.next();
+ }
+
+ @Override
+ public void close() {
+ if (logStream != null) {
+ try {
+ logStream.close();
+ } catch (IOException e) {
+ logger.error("Cannot close log file {}", filepath, e);
+ }
+ }
+ }
+
+ public void open(File logFile) throws FileNotFoundException {
+ close();
+ logStream = new DataInputStream(new BufferedInputStream(new
FileInputStream(logFile)));
+ logger.info("open WAL file: {} size is {}", logFile.getName(),
logFile.length());
+ this.filepath = logFile.getPath();
+ idx = 0;
+ }
+
+ public boolean isFileCorrupted() {
+ return fileCorrupted;
+ }
+
+ private void truncateBrokenLogs() {
+ try (FileOutputStream outputStream = new FileOutputStream(filepath, true);
+ FileChannel channel = outputStream.getChannel()) {
+ channel.truncate(unbrokenLogsSize);
+ } catch (IOException e) {
+ logger.error("Fail to truncate log file to size {}", unbrokenLogsSize,
e);
+ }
+ }
+}