This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 81a9284305 [improvement][refactor](image) refactor the read and load
method of meta image #10005
81a9284305 is described below
commit 81a92843051f7a2a4102a1e1f95bca3b42513115
Author: Mingyu Chen <[email protected]>
AuthorDate: Fri Jun 10 14:56:14 2022 +0800
[improvement][refactor](image) refactor the read and load method of meta
image #10005
---
.../src/main/java/org/apache/doris/PaloFe.java | 2 +-
.../java/org/apache/doris/catalog/Catalog.java | 17 +-
.../java/org/apache/doris/common/FeConstants.java | 2 +
.../doris/datasource/InternalDataSource.java | 4 +
.../{common => persist/meta}/FeMetaFormat.java | 2 +-
.../doris/{common => persist/meta}/MetaFooter.java | 13 +-
.../doris/{common => persist/meta}/MetaHeader.java | 2 +-
.../doris/{common => persist/meta}/MetaIndex.java | 2 +-
.../{common => persist/meta}/MetaJsonHeader.java | 3 +-
.../{common => persist/meta}/MetaMagicNumber.java | 4 +-
.../doris/persist/meta/MetaPersistMethod.java | 203 +++++++++++++++++++++
.../doris/{common => persist/meta}/MetaReader.java | 54 +++---
.../doris/{common => persist/meta}/MetaWriter.java | 52 +++---
.../doris/persist/meta/PersistMetaModules.java | 56 ++++++
.../java/org/apache/doris/catalog/CatalogTest.java | 2 +-
.../java/org/apache/doris/qe/VariableMgrTest.java | 1 +
16 files changed, 344 insertions(+), 75 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java
b/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java
index 32ebb5d691..0d1ec31434 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java
@@ -22,7 +22,6 @@ import org.apache.doris.common.CommandLineOptions;
import org.apache.doris.common.Config;
import org.apache.doris.common.LdapConfig;
import org.apache.doris.common.Log4jConfig;
-import org.apache.doris.common.MetaReader;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.Version;
import org.apache.doris.common.util.JdkUtils;
@@ -31,6 +30,7 @@ import org.apache.doris.httpv2.HttpServer;
import org.apache.doris.journal.bdbje.BDBDebugger;
import org.apache.doris.journal.bdbje.BDBTool;
import org.apache.doris.journal.bdbje.BDBToolOptions;
+import org.apache.doris.persist.meta.MetaReader;
import org.apache.doris.qe.QeService;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FeServer;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index a42add1079..d051e8afbf 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -98,10 +98,7 @@ import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
-import org.apache.doris.common.MetaHeader;
import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.common.MetaReader;
-import org.apache.doris.common.MetaWriter;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
@@ -187,6 +184,9 @@ import org.apache.doris.persist.StorageInfoV2;
import org.apache.doris.persist.TableInfo;
import org.apache.doris.persist.TablePropertyInfo;
import org.apache.doris.persist.TruncateTableInfo;
+import org.apache.doris.persist.meta.MetaHeader;
+import org.apache.doris.persist.meta.MetaReader;
+import org.apache.doris.persist.meta.MetaWriter;
import org.apache.doris.plugin.PluginInfo;
import org.apache.doris.plugin.PluginMgr;
import org.apache.doris.policy.PolicyMgr;
@@ -237,7 +237,6 @@ import org.codehaus.jackson.map.ObjectMapper;
import java.io.BufferedReader;
import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
@@ -1631,6 +1630,10 @@ public class Catalog {
return newChecksum;
}
+ public long loadBackends(DataInputStream dis, long checksum) throws
IOException {
+ return systemInfo.loadBackends(dis, checksum);
+ }
+
public long loadDb(DataInputStream dis, long checksum) throws IOException,
DdlException {
return getInternalDataSource().loadDb(dis, checksum);
}
@@ -1940,6 +1943,10 @@ public class Catalog {
return checksum;
}
+ public long saveBackends(CountingDataOutputStream dos, long checksum)
throws IOException {
+ return systemInfo.saveBackends(dos, checksum);
+ }
+
public long saveDb(CountingDataOutputStream dos, long checksum) throws
IOException {
return getInternalDataSource().saveDb(dos, checksum);
}
@@ -4668,7 +4675,7 @@ public class Catalog {
pluginMgr.installPlugin(stmt);
}
- public long savePlugins(DataOutputStream dos, long checksum) throws
IOException {
+ public long savePlugins(CountingDataOutputStream dos, long checksum)
throws IOException {
Catalog.getCurrentPluginMgr().write(dos);
return checksum;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index 27ffda6d87..4cb2bd877b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -17,6 +17,8 @@
package org.apache.doris.common;
+import org.apache.doris.persist.meta.FeMetaFormat;
+
public class FeConstants {
// Database and table's default configurations, we will never change them
public static short default_replication_num = 3;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
index f421c59e9d..7068cd92d9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
@@ -3208,6 +3208,10 @@ public class InternalDataSource implements DataSourceIf {
}
Catalog.getCurrentGlobalTransactionMgr().addDatabaseTransactionMgr(db.getId());
}
+ // ATTN: this should be done after load Db, and before loadAlterJob
+ recreateTabletInvertIndex();
+ // rebuild es state state
+ getEsRepository().loadTableFromCatalog();
LOG.info("finished replay databases from image");
return newChecksum;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaFormat.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/FeMetaFormat.java
similarity index 97%
rename from fe/fe-core/src/main/java/org/apache/doris/common/FeMetaFormat.java
rename to
fe/fe-core/src/main/java/org/apache/doris/persist/meta/FeMetaFormat.java
index 7c216e82c0..1c0eb8fc73 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaFormat.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/FeMetaFormat.java
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.common;
+package org.apache.doris.persist.meta;
public enum FeMetaFormat {
COR1("COR1", "v1");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/MetaFooter.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaFooter.java
similarity index 91%
rename from fe/fe-core/src/main/java/org/apache/doris/common/MetaFooter.java
rename to fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaFooter.java
index 1e9dcace46..bc35027208 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/MetaFooter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaFooter.java
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.common;
+package org.apache.doris.persist.meta;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
@@ -107,4 +107,15 @@ public class MetaFooter {
this.length = length;
}
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("checksum: ").append(checksum);
+ sb.append("\nlength: ").append(length);
+ sb.append("\nindices:");
+ for (MetaIndex metaIndex : metaIndices) {
+ sb.append("\n\t").append(metaIndex.toString());
+ }
+ return sb.toString();
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/MetaHeader.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaHeader.java
similarity index 99%
rename from fe/fe-core/src/main/java/org/apache/doris/common/MetaHeader.java
rename to fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaHeader.java
index f96bfc7f9e..0528f2dc2a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/MetaHeader.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaHeader.java
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.common;
+package org.apache.doris.persist.meta;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/MetaIndex.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaIndex.java
similarity index 97%
rename from fe/fe-core/src/main/java/org/apache/doris/common/MetaIndex.java
rename to fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaIndex.java
index 399634ea28..2c5a8ace51 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/MetaIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaIndex.java
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.common;
+package org.apache.doris.persist.meta;
import org.apache.doris.common.io.Text;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/MetaJsonHeader.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaJsonHeader.java
similarity index 96%
rename from fe/fe-core/src/main/java/org/apache/doris/common/MetaJsonHeader.java
rename to
fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaJsonHeader.java
index 527af9558f..c59b47ad73 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/MetaJsonHeader.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaJsonHeader.java
@@ -15,8 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.common;
+package org.apache.doris.persist.meta;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.io.Text;
import org.codehaus.jackson.map.ObjectMapper;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/MetaMagicNumber.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaMagicNumber.java
similarity index 95%
rename from
fe/fe-core/src/main/java/org/apache/doris/common/MetaMagicNumber.java
rename to
fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaMagicNumber.java
index 204c5bb588..6e8af5912c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/MetaMagicNumber.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaMagicNumber.java
@@ -15,7 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.common;
+package org.apache.doris.persist.meta;
+
+import org.apache.doris.common.FeConstants;
import java.io.IOException;
import java.io.RandomAccessFile;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java
new file mode 100644
index 0000000000..16e9de59e0
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist.meta;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.io.CountingDataOutputStream;
+
+import java.io.DataInputStream;
+import java.lang.reflect.Method;
+
+/**
+ * Defines a write and read method for the metadata module
+ * that needs to be persisted to the image.
+ */
+public class MetaPersistMethod {
+ public String name;
+ public Method readMethod;
+ public Method writeMethod;
+
+ public MetaPersistMethod(String name) {
+ this.name = name;
+ }
+
+ /**
+ * All meta modules should be added to this method.
+ * Modules' names are defined in {@link PersistMetaModules}
+ *
+ * @param name
+ * @return
+ * @throws NoSuchMethodException
+ */
+ public static MetaPersistMethod create(String name) throws
NoSuchMethodException {
+ MetaPersistMethod metaPersistMethod = new MetaPersistMethod(name);
+ switch (name) {
+ case "masterInfo":
+ metaPersistMethod.readMethod =
+ Catalog.class.getDeclaredMethod("loadMasterInfo",
DataInputStream.class, long.class);
+ metaPersistMethod.writeMethod =
+ Catalog.class.getDeclaredMethod("saveMasterInfo",
CountingDataOutputStream.class, long.class);
+ break;
+ case "frontends":
+ metaPersistMethod.readMethod =
+ Catalog.class.getDeclaredMethod("loadFrontends",
DataInputStream.class, long.class);
+ metaPersistMethod.writeMethod =
+ Catalog.class.getDeclaredMethod("saveFrontends",
CountingDataOutputStream.class, long.class);
+ break;
+ case "backends":
+ metaPersistMethod.readMethod =
+ Catalog.class.getDeclaredMethod("loadBackends",
DataInputStream.class, long.class);
+ metaPersistMethod.writeMethod =
+ Catalog.class.getDeclaredMethod("saveBackends",
CountingDataOutputStream.class, long.class);
+ break;
+ case "db":
+ metaPersistMethod.readMethod =
+ Catalog.class.getDeclaredMethod("loadDb",
DataInputStream.class, long.class);
+ metaPersistMethod.writeMethod =
+ Catalog.class.getDeclaredMethod("saveDb",
CountingDataOutputStream.class, long.class);
+ break;
+ case "loadJob":
+ metaPersistMethod.readMethod =
+ Catalog.class.getDeclaredMethod("loadLoadJob",
DataInputStream.class, long.class);
+ metaPersistMethod.writeMethod =
+ Catalog.class.getDeclaredMethod("saveLoadJob",
CountingDataOutputStream.class, long.class);
+ break;
+ case "alterJob":
+ metaPersistMethod.readMethod =
+ Catalog.class.getDeclaredMethod("loadAlterJob",
DataInputStream.class, long.class);
+ metaPersistMethod.writeMethod =
+ Catalog.class.getDeclaredMethod("saveAlterJob",
CountingDataOutputStream.class, long.class);
+ break;
+ case "recycleBin":
+ metaPersistMethod.readMethod =
+ Catalog.class.getDeclaredMethod("loadRecycleBin",
DataInputStream.class, long.class);
+ metaPersistMethod.writeMethod =
+ Catalog.class.getDeclaredMethod("saveRecycleBin",
CountingDataOutputStream.class, long.class);
+ break;
+ case "globalVariable":
+ metaPersistMethod.readMethod =
+ Catalog.class.getDeclaredMethod("loadGlobalVariable",
DataInputStream.class, long.class);
+ metaPersistMethod.writeMethod =
+ Catalog.class.getDeclaredMethod("saveGlobalVariable",
CountingDataOutputStream.class,
+ long.class);
+ break;
+ case "cluster":
+ metaPersistMethod.readMethod =
+ Catalog.class.getDeclaredMethod("loadCluster",
DataInputStream.class, long.class);
+ metaPersistMethod.writeMethod =
+ Catalog.class.getDeclaredMethod("saveCluster",
CountingDataOutputStream.class, long.class);
+ break;
+ case "broker":
+ metaPersistMethod.readMethod =
+ Catalog.class.getDeclaredMethod("loadBrokers",
DataInputStream.class, long.class);
+ metaPersistMethod.writeMethod =
+ Catalog.class.getDeclaredMethod("saveBrokers",
CountingDataOutputStream.class, long.class);
+ break;
+ case "resources":
+ metaPersistMethod.readMethod =
+ Catalog.class.getDeclaredMethod("loadResources",
DataInputStream.class, long.class);
+ metaPersistMethod.writeMethod =
+ Catalog.class.getDeclaredMethod("saveResources",
CountingDataOutputStream.class, long.class);
+ break;
+ case "exportJob":
+ metaPersistMethod.readMethod =
+ Catalog.class.getDeclaredMethod("loadExportJob",
DataInputStream.class, long.class);
+ metaPersistMethod.writeMethod =
+ Catalog.class.getDeclaredMethod("saveExportJob",
CountingDataOutputStream.class, long.class);
+ break;
+ case "syncJob":
+ metaPersistMethod.readMethod =
+ Catalog.class.getDeclaredMethod("loadSyncJobs",
DataInputStream.class, long.class);
+ metaPersistMethod.writeMethod =
+ Catalog.class.getDeclaredMethod("saveSyncJobs",
CountingDataOutputStream.class, long.class);
+ break;
+ case "backupHandler":
+ metaPersistMethod.readMethod =
+ Catalog.class.getDeclaredMethod("loadBackupHandler",
DataInputStream.class, long.class);
+ metaPersistMethod.writeMethod =
+ Catalog.class.getDeclaredMethod("saveBackupHandler",
CountingDataOutputStream.class, long.class);
+ break;
+ case "paloAuth":
+ metaPersistMethod.readMethod =
+ Catalog.class.getDeclaredMethod("loadPaloAuth",
DataInputStream.class, long.class);
+ metaPersistMethod.writeMethod =
+ Catalog.class.getDeclaredMethod("savePaloAuth",
CountingDataOutputStream.class, long.class);
+ break;
+ case "transactionState":
+ metaPersistMethod.readMethod =
+
Catalog.class.getDeclaredMethod("loadTransactionState", DataInputStream.class,
long.class);
+ metaPersistMethod.writeMethod =
+
Catalog.class.getDeclaredMethod("saveTransactionState",
CountingDataOutputStream.class,
+ long.class);
+ break;
+ case "colocateTableIndex":
+ metaPersistMethod.readMethod =
+
Catalog.class.getDeclaredMethod("loadColocateTableIndex",
DataInputStream.class, long.class);
+ metaPersistMethod.writeMethod =
+
Catalog.class.getDeclaredMethod("saveColocateTableIndex",
CountingDataOutputStream.class,
+ long.class);
+ break;
+ case "routineLoadJobs":
+ metaPersistMethod.readMethod =
+ Catalog.class.getDeclaredMethod("loadRoutineLoadJobs",
DataInputStream.class, long.class);
+ metaPersistMethod.writeMethod =
+ Catalog.class.getDeclaredMethod("saveRoutineLoadJobs",
CountingDataOutputStream.class,
+ long.class);
+ break;
+ case "loadJobV2":
+ metaPersistMethod.readMethod =
+ Catalog.class.getDeclaredMethod("loadLoadJobsV2",
DataInputStream.class, long.class);
+ metaPersistMethod.writeMethod =
+ Catalog.class.getDeclaredMethod("saveLoadJobsV2",
CountingDataOutputStream.class, long.class);
+ break;
+ case "smallFiles":
+ metaPersistMethod.readMethod =
+ Catalog.class.getDeclaredMethod("loadSmallFiles",
DataInputStream.class, long.class);
+ metaPersistMethod.writeMethod =
+ Catalog.class.getDeclaredMethod("saveSmallFiles",
CountingDataOutputStream.class, long.class);
+ break;
+ case "plugins":
+ metaPersistMethod.readMethod =
+ Catalog.class.getDeclaredMethod("loadPlugins",
DataInputStream.class, long.class);
+ metaPersistMethod.writeMethod =
+ Catalog.class.getDeclaredMethod("savePlugins",
CountingDataOutputStream.class, long.class);
+ break;
+ case "deleteHandler":
+ metaPersistMethod.readMethod =
+ Catalog.class.getDeclaredMethod("loadDeleteHandler",
DataInputStream.class, long.class);
+ metaPersistMethod.writeMethod =
+ Catalog.class.getDeclaredMethod("saveDeleteHandler",
CountingDataOutputStream.class, long.class);
+ break;
+ case "sqlBlockRule":
+ metaPersistMethod.readMethod =
+ Catalog.class.getDeclaredMethod("loadSqlBlockRule",
DataInputStream.class, long.class);
+ metaPersistMethod.writeMethod =
+ Catalog.class.getDeclaredMethod("saveSqlBlockRule",
CountingDataOutputStream.class, long.class);
+ break;
+ case "policy":
+ metaPersistMethod.readMethod =
+ Catalog.class.getDeclaredMethod("loadPolicy",
DataInputStream.class, long.class);
+ metaPersistMethod.writeMethod =
+ Catalog.class.getDeclaredMethod("savePolicy",
CountingDataOutputStream.class, long.class);
+ break;
+ default:
+ break;
+ }
+ return metaPersistMethod;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaReader.java
similarity index 66%
rename from fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java
rename to fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaReader.java
index bbe474f0df..df2c29cd53 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaReader.java
@@ -15,9 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.common;
+package org.apache.doris.persist.meta;
import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.DdlException;
import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
@@ -29,6 +30,7 @@ import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
/**
* Image Format:
@@ -69,48 +71,36 @@ public class MetaReader {
LOG.info("start load image from {}. is ckpt: {}",
imageFile.getAbsolutePath(), Catalog.isCheckpointThread());
long loadImageStartTime = System.currentTimeMillis();
MetaHeader metaHeader = MetaHeader.read(imageFile);
+ MetaFooter metaFooter = MetaFooter.read(imageFile);
long checksum = 0;
try (DataInputStream dis = new DataInputStream(new
BufferedInputStream(new FileInputStream(imageFile)))) {
+ // 1. Skip image file header
IOUtils.skipFully(dis, metaHeader.getEnd());
+ // 2. Read meta header first
checksum = catalog.loadHeader(dis, metaHeader, checksum);
- checksum = catalog.loadMasterInfo(dis, checksum);
- checksum = catalog.loadFrontends(dis, checksum);
- checksum = Catalog.getCurrentSystemInfo().loadBackends(dis,
checksum);
- checksum = catalog.loadDb(dis, checksum);
- // ATTN: this should be done after load Db, and before loadAlterJob
- catalog.getInternalDataSource().recreateTabletInvertIndex();
- // rebuild es state state
- catalog.getEsRepository().loadTableFromCatalog();
- checksum = catalog.loadLoadJob(dis, checksum);
- checksum = catalog.loadAlterJob(dis, checksum);
- checksum = catalog.loadRecycleBin(dis, checksum);
- checksum = catalog.loadGlobalVariable(dis, checksum);
- checksum = catalog.loadCluster(dis, checksum);
- checksum = catalog.loadBrokers(dis, checksum);
- checksum = catalog.loadResources(dis, checksum);
- checksum = catalog.loadExportJob(dis, checksum);
- checksum = catalog.loadSyncJobs(dis, checksum);
- checksum = catalog.loadBackupHandler(dis, checksum);
- checksum = catalog.loadPaloAuth(dis, checksum);
- // global transaction must be replayed before load jobs v2
- checksum = catalog.loadTransactionState(dis, checksum);
- checksum = catalog.loadColocateTableIndex(dis, checksum);
- checksum = catalog.loadRoutineLoadJobs(dis, checksum);
- checksum = catalog.loadLoadJobsV2(dis, checksum);
- checksum = catalog.loadSmallFiles(dis, checksum);
- checksum = catalog.loadPlugins(dis, checksum);
- checksum = catalog.loadDeleteHandler(dis, checksum);
- checksum = catalog.loadSqlBlockRule(dis, checksum);
- checksum = catalog.loadPolicy(dis, checksum);
+ // 3. Read other meta modules
+ // Modules must be read in the order in which the metadata was
written
+ for (MetaIndex metaIndex : metaFooter.metaIndices) {
+ if (metaIndex.name.equals("header")) {
+ // skip meta header, which has been read before.
+ continue;
+ }
+ MetaPersistMethod persistMethod =
PersistMetaModules.MODULES_MAP.get(metaIndex.name);
+ if (persistMethod == null) {
+ throw new IOException("Unknown meta module: " +
metaIndex.name + ". Known moduels: "
+ + PersistMetaModules.MODULE_NAMES);
+ }
+ checksum = (long) persistMethod.readMethod.invoke(catalog,
dis, checksum);
+ }
+ } catch (InvocationTargetException | IllegalAccessException e) {
+ throw new IOException(e);
}
- MetaFooter metaFooter = MetaFooter.read(imageFile);
long remoteChecksum = metaFooter.checksum;
Preconditions.checkState(remoteChecksum == checksum, remoteChecksum +
" vs. " + checksum);
long loadImageEndTime = System.currentTimeMillis();
LOG.info("finished to load image in " + (loadImageEndTime -
loadImageStartTime) + " ms");
}
-
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/MetaWriter.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaWriter.java
similarity index 60%
rename from fe/fe-core/src/main/java/org/apache/doris/common/MetaWriter.java
rename to fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaWriter.java
index 553a022cf9..781394f078 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/MetaWriter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaWriter.java
@@ -15,9 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.common;
+package org.apache.doris.persist.meta;
import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.Reference;
import org.apache.doris.common.io.CountingDataOutputStream;
import com.google.common.collect.Lists;
@@ -28,6 +29,7 @@ import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.util.List;
/**
@@ -100,42 +102,32 @@ public class MetaWriter {
long startPosition = MetaHeader.write(imageFile);
List<MetaIndex> metaIndices = Lists.newArrayList();
FileOutputStream imageFileOut = new FileOutputStream(imageFile, true);
- try (CountingDataOutputStream dos = new CountingDataOutputStream(new
BufferedOutputStream(
- imageFileOut), startPosition)) {
+ try (CountingDataOutputStream dos = new CountingDataOutputStream(new
BufferedOutputStream(imageFileOut),
+ startPosition)) {
writer.setDelegate(dos, metaIndices);
long replayedJournalId = catalog.getReplayedJournalId();
- checksum.setRef(writer.doWork("header", () ->
catalog.saveHeader(dos, replayedJournalId, checksum.getRef())));
- checksum.setRef(writer.doWork("masterInfo", () ->
catalog.saveMasterInfo(dos, checksum.getRef())));
- checksum.setRef(writer.doWork("frontends", () ->
catalog.saveFrontends(dos, checksum.getRef())));
- checksum.setRef(writer.doWork("backends", () ->
Catalog.getCurrentSystemInfo().saveBackends(dos, checksum.getRef())));
- checksum.setRef(writer.doWork("db", () -> catalog.saveDb(dos,
checksum.getRef())));
- checksum.setRef(writer.doWork("loadJob", () ->
catalog.saveLoadJob(dos, checksum.getRef())));
- checksum.setRef(writer.doWork("alterJob", () ->
catalog.saveAlterJob(dos, checksum.getRef())));
- checksum.setRef(writer.doWork("recycleBin", () ->
catalog.saveRecycleBin(dos, checksum.getRef())));
- checksum.setRef(writer.doWork("globalVariable", () ->
catalog.saveGlobalVariable(dos, checksum.getRef())));
- checksum.setRef(writer.doWork("cluster", () ->
catalog.saveCluster(dos, checksum.getRef())));
- checksum.setRef(writer.doWork("broker", () ->
catalog.saveBrokers(dos, checksum.getRef())));
- checksum.setRef(writer.doWork("resources", () ->
catalog.saveResources(dos, checksum.getRef())));
- checksum.setRef(writer.doWork("exportJob", () ->
catalog.saveExportJob(dos, checksum.getRef())));
- checksum.setRef(writer.doWork("syncJob", () ->
catalog.saveSyncJobs(dos, checksum.getRef())));
- checksum.setRef(writer.doWork("backupHandler", () ->
catalog.saveBackupHandler(dos, checksum.getRef())));
- checksum.setRef(writer.doWork("paloAuth", () ->
catalog.savePaloAuth(dos, checksum.getRef())));
- checksum.setRef(writer.doWork("transactionState", () ->
catalog.saveTransactionState(dos, checksum.getRef())));
- checksum.setRef(writer.doWork("colocateTableIndex", () ->
catalog.saveColocateTableIndex(dos, checksum.getRef())));
- checksum.setRef(writer.doWork("routineLoadJobs", () ->
catalog.saveRoutineLoadJobs(dos, checksum.getRef())));
- checksum.setRef(writer.doWork("loadJobV2", () ->
catalog.saveLoadJobsV2(dos, checksum.getRef())));
- checksum.setRef(writer.doWork("smallFiles", () ->
catalog.saveSmallFiles(dos, checksum.getRef())));
- checksum.setRef(writer.doWork("plugins", () ->
catalog.savePlugins(dos, checksum.getRef())));
- checksum.setRef(writer.doWork("deleteHandler", () ->
catalog.saveDeleteHandler(dos, checksum.getRef())));
- checksum.setRef(writer.doWork("sqlBlockRule", () ->
catalog.saveSqlBlockRule(dos, checksum.getRef())));
- checksum.setRef(writer.doWork("policy", () ->
catalog.savePolicy(dos, checksum.getRef())));
+ // 1. write header first
+ checksum.setRef(
+ writer.doWork("header", () -> catalog.saveHeader(dos,
replayedJournalId, checksum.getRef())));
+ // 2. write other modules
+ for (MetaPersistMethod m : PersistMetaModules.MODULES_IN_ORDER) {
+ checksum.setRef(writer.doWork(m.name, () -> {
+ try {
+ return (long) m.writeMethod.invoke(catalog, dos,
checksum.getRef());
+ } catch (IllegalAccessException |
InvocationTargetException e) {
+ LOG.warn("failed to write meta module: {}", m.name, e);
+ throw new RuntimeException(e);
+ }
+ }));
+ }
+ // 3. force sync to disk
imageFileOut.getChannel().force(true);
}
MetaFooter.write(imageFile, metaIndices, checksum.getRef());
long saveImageEndTime = System.currentTimeMillis();
- LOG.info("finished save image {} in {} ms. checksum is {}",
- imageFile.getAbsolutePath(), (saveImageEndTime -
saveImageStartTime), checksum.getRef());
+ LOG.info("finished save image {} in {} ms. checksum is {}",
imageFile.getAbsolutePath(),
+ (saveImageEndTime - saveImageStartTime), checksum.getRef());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java
new file mode 100644
index 0000000000..6e01f13af9
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist.meta;
+
+import com.clearspring.analytics.util.Lists;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Save all MetaPersistMethods.
+ */
+public class PersistMetaModules {
+ // module name -> MetaPersistMethod
+ public static final Map<String, MetaPersistMethod> MODULES_MAP;
+ // Save MetaPersistMethod in order.
+ // The write and read of meta modules should be in same order.
+ public static final List<MetaPersistMethod> MODULES_IN_ORDER;
+
+ public static final ImmutableList<String> MODULE_NAMES =
ImmutableList.copyOf(
+ new String[] {"masterInfo", "frontends", "backends", "db",
"loadJob", "alterJob", "recycleBin",
+ "globalVariable", "cluster", "broker", "resources",
"exportJob", "syncJob", "backupHandler",
+ "paloAuth", "transactionState", "colocateTableIndex",
"routineLoadJobs", "loadJobV2", "smallFiles",
+ "plugins", "deleteHandler", "sqlBlockRule", "policy"});
+
+ static {
+ MODULES_MAP = Maps.newHashMap();
+ MODULES_IN_ORDER = Lists.newArrayList();
+ try {
+ for (String name : MODULE_NAMES) {
+ MetaPersistMethod persistMethod =
MetaPersistMethod.create(name);
+ MODULES_MAP.put(name, persistMethod);
+ MODULES_IN_ORDER.add(persistMethod);
+ }
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTest.java
index d86c8e3455..e6d70de68f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTest.java
@@ -18,11 +18,11 @@
package org.apache.doris.catalog;
import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.MetaHeader;
import org.apache.doris.common.io.CountingDataOutputStream;
import org.apache.doris.load.Load;
import org.apache.doris.load.LoadJob;
import org.apache.doris.meta.MetaContext;
+import org.apache.doris.persist.meta.MetaHeader;
import mockit.Expectations;
import org.junit.Assert;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
index 5d9156c520..5654d44ee6 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
@@ -179,6 +179,7 @@ public class VariableMgrTest {
Deencapsulation.setField(Catalog.class, "checkpointThreadId",
Thread.currentThread().getId());
currentCatalog.getCheckpointer().doCheckpoint();
} catch (Throwable e) {
+ e.printStackTrace();
Assert.fail(e.getMessage());
} finally {
// Restore the ckptThreadId
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]