This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f2d84d81e6 [feature-wip][refactor](multi-catalog) Persist external
catalog related metadata. (#13746)
f2d84d81e6 is described below
commit f2d84d81e651d3de8e4c67a0adbf927a7ca04268
Author: Jibing-Li <[email protected]>
AuthorDate: Fri Nov 4 09:04:00 2022 +0800
[feature-wip][refactor](multi-catalog) Persist external catalog related
metadata. (#13746)
Persist external catalog/db/table, including the columns of external tables.
After this change, external objects could have their own uniq ID through
their lifetime,
this is required for the statistic information collection.
---
.../main/java/org/apache/doris/catalog/Column.java | 5 +-
.../main/java/org/apache/doris/catalog/Env.java | 2 +-
.../org/apache/doris/catalog/RefreshManager.java | 10 ++
.../doris/catalog/external/EsExternalDatabase.java | 79 +++++++++++-
.../doris/catalog/external/EsExternalTable.java | 79 ++++++++----
.../doris/catalog/external/ExternalDatabase.java | 63 +++++++++-
.../doris/catalog/external/ExternalTable.java | 81 ++++++++++---
.../catalog/external/HMSExternalDatabase.java | 81 ++++++++++++-
.../doris/catalog/external/HMSExternalTable.java | 133 +++++++++++++--------
.../org/apache/doris/datasource/CatalogMgr.java | 50 ++++++++
.../apache/doris/datasource/EsExternalCatalog.java | 49 ++++++--
.../apache/doris/datasource/ExternalCatalog.java | 78 +++++++++++-
.../apache/doris/datasource/ExternalObjectLog.java | 56 +++++++++
.../doris/datasource/HMSExternalCatalog.java | 72 +++++++----
.../apache/doris/datasource/InitCatalogLog.java | 92 ++++++++++++++
.../apache/doris/datasource/InitDatabaseLog.java | 96 +++++++++++++++
.../org/apache/doris/datasource/InitTableLog.java | 67 +++++++++++
.../doris/external/elasticsearch/EsUtil.java | 2 +
.../org/apache/doris/journal/JournalEntity.java | 25 ++++
.../java/org/apache/doris/persist/EditLog.java | 49 ++++++++
.../org/apache/doris/persist/OperationType.java | 5 +
.../org/apache/doris/persist/gson/GsonUtils.java | 26 +++-
.../org/apache/doris/qe/MasterCatalogExecutor.java | 84 +++++++++++++
.../apache/doris/service/FrontendServiceImpl.java | 76 ++++++++++++
.../apache/doris/datasource/CatalogMgrTest.java | 42 ++++++-
gensrc/thrift/FrontendService.thrift | 12 ++
26 files changed, 1273 insertions(+), 141 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
index a3d529d718..532a49b0eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
@@ -597,10 +597,9 @@ public class Column implements Writable {
&& getStrLen() == other.getStrLen()
&& getPrecision() == other.getPrecision()
&& getScale() == other.getScale()
- && comment.equals(other.comment)
+ && Objects.equals(comment, other.comment)
&& visible == other.visible
- && children.size() == other.children.size()
- && children.equals(other.children);
+ && Objects.equals(children, other.children);
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index e665064a35..05bfc80efa 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -1937,7 +1937,7 @@ public class Env {
**/
public long loadCatalog(DataInputStream in, long checksum) throws
IOException {
CatalogMgr mgr = CatalogMgr.read(in);
- // When enable the multi catalog in the first time, the mgr will be a
null value.
+ // When enable the multi catalog in the first time, the "mgr" will be
a null value.
// So ignore it to use default catalog manager.
if (mgr != null) {
this.catalogMgr = mgr;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
index 31cbc266f5..bb37a9b1b9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
@@ -28,6 +28,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.logging.log4j.LogManager;
@@ -117,6 +118,10 @@ public class RefreshManager {
throw new DdlException("Database " + dbName + " does not exist in
catalog " + catalog.getName());
}
((ExternalDatabase) db).setUnInitialized();
+ ExternalObjectLog log = new ExternalObjectLog();
+ log.setCatalogId(catalog.getId());
+ log.setDbId(db.getId());
+ Env.getCurrentEnv().getEditLog().logRefreshExternalDb(log);
}
private void refreshInternalCtlIcebergTable(RefreshTableStmt stmt, Env
env) throws UserException {
@@ -156,5 +161,10 @@ public class RefreshManager {
throw new DdlException("Table " + tableName + " does not exist in
db " + dbName);
}
((ExternalTable) table).setUnInitialized();
+ ExternalObjectLog log = new ExternalObjectLog();
+ log.setCatalogId(catalog.getId());
+ log.setDbId(db.getId());
+ log.setTableId(table.getId());
+ Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java
index 8898ede84d..f9ba015826 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java
@@ -20,27 +20,34 @@ package org.apache.doris.catalog.external;
import org.apache.doris.catalog.Env;
import org.apache.doris.datasource.EsExternalCatalog;
import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.InitDatabaseLog;
+import org.apache.doris.persist.gson.GsonPostProcessable;
+import org.apache.doris.qe.MasterCatalogExecutor;
import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Elasticsearch metastore external database.
*/
-public class EsExternalDatabase extends ExternalDatabase<EsExternalTable> {
+public class EsExternalDatabase extends ExternalDatabase<EsExternalTable>
implements GsonPostProcessable {
private static final Logger LOG =
LogManager.getLogger(EsExternalDatabase.class);
// Cache of table name to table id.
private Map<String, Long> tableNameToId = Maps.newConcurrentMap();
- private Map<Long, EsExternalTable> idToTbl = Maps.newHashMap();
+ @SerializedName(value = "idToTbl")
+ private Map<Long, EsExternalTable> idToTbl = Maps.newConcurrentMap();
/**
* Create Elasticsearch external database.
@@ -53,14 +60,53 @@ public class EsExternalDatabase extends
ExternalDatabase<EsExternalTable> {
super(extCatalog, id, name);
}
- private synchronized void makeSureInitialized() {
+ public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
+ Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
+ Map<Long, EsExternalTable> tmpIdToTbl = Maps.newConcurrentMap();
+ for (int i = 0; i < log.getRefreshCount(); i++) {
+ EsExternalTable table =
getTableForReplay(log.getRefreshTableIds().get(i));
+ table.setUnInitialized();
+ tmpTableNameToId.put(table.getName(), table.getId());
+ tmpIdToTbl.put(table.getId(), table);
+ }
+ for (int i = 0; i < log.getCreateCount(); i++) {
+ EsExternalTable table = new
EsExternalTable(log.getCreateTableIds().get(i),
+ log.getCreateTableNames().get(i), name,
(EsExternalCatalog) catalog);
+ tmpTableNameToId.put(table.getName(), table.getId());
+ tmpIdToTbl.put(table.getId(), table);
+ }
+ tableNameToId = tmpTableNameToId;
+ idToTbl = tmpIdToTbl;
+ initialized = true;
+ }
+
+ public void setTableExtCatalog(ExternalCatalog extCatalog) {
+ for (EsExternalTable table : idToTbl.values()) {
+ table.setCatalog(extCatalog);
+ }
+ }
+
+ public synchronized void makeSureInitialized() {
if (!initialized) {
+ if (!Env.getCurrentEnv().isMaster()) {
+ // Forward to master and wait the journal to replay.
+ MasterCatalogExecutor remoteExecutor = new
MasterCatalogExecutor();
+ try {
+ remoteExecutor.forward(extCatalog.getId(), id, -1);
+ } catch (Exception e) {
+ LOG.warn("Failed to forward init db {} operation to
master. {}", name, e.getMessage());
+ }
+ return;
+ }
init();
- initialized = true;
}
}
private void init() {
+ InitDatabaseLog initDatabaseLog = new InitDatabaseLog();
+ initDatabaseLog.setType(InitDatabaseLog.Type.ES);
+ initDatabaseLog.setCatalogId(extCatalog.getId());
+ initDatabaseLog.setDbId(id);
List<String> tableNames = extCatalog.listTableNames(null, name);
if (tableNames != null) {
Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
@@ -73,15 +119,20 @@ public class EsExternalDatabase extends
ExternalDatabase<EsExternalTable> {
EsExternalTable table = idToTbl.get(tblId);
table.setUnInitialized();
tmpIdToTbl.put(tblId, table);
+ initDatabaseLog.addRefreshTable(tblId);
} else {
tblId = Env.getCurrentEnv().getNextId();
tmpTableNameToId.put(tableName, tblId);
- tmpIdToTbl.put(tblId, new EsExternalTable(tblId,
tableName, name, (EsExternalCatalog) extCatalog));
+ EsExternalTable table = new EsExternalTable(tblId,
tableName, name, (EsExternalCatalog) extCatalog);
+ tmpIdToTbl.put(tblId, table);
+ initDatabaseLog.addCreateTable(tblId, tableName);
}
}
tableNameToId = tmpTableNameToId;
idToTbl = tmpIdToTbl;
}
+ initialized = true;
+ Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog);
}
@Override
@@ -110,4 +161,22 @@ public class EsExternalDatabase extends
ExternalDatabase<EsExternalTable> {
makeSureInitialized();
return idToTbl.get(tableId);
}
+
+ public EsExternalTable getTableForReplay(long tableId) {
+ return idToTbl.get(tableId);
+ }
+
+ @Override
+ public void gsonPostProcess() throws IOException {
+ tableNameToId = Maps.newConcurrentMap();
+ for (EsExternalTable tbl : idToTbl.values()) {
+ tableNameToId.put(tbl.getName(), tbl.getId());
+ }
+ rwLock = new ReentrantReadWriteLock(true);
+ }
+
+ public void addTableForTest(EsExternalTable tbl) {
+ idToTbl.put(tbl.getId(), tbl);
+ tableNameToId.put(tbl.getName(), tbl.getId());
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java
index ea44552896..15d1384744 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java
@@ -18,9 +18,12 @@
package org.apache.doris.catalog.external;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EsTable;
import org.apache.doris.datasource.EsExternalCatalog;
+import org.apache.doris.datasource.InitTableLog;
import org.apache.doris.external.elasticsearch.EsUtil;
+import org.apache.doris.qe.MasterCatalogExecutor;
import org.apache.doris.thrift.TEsTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
@@ -36,9 +39,6 @@ import java.util.List;
public class EsExternalTable extends ExternalTable {
private static final Logger LOG =
LogManager.getLogger(EsExternalTable.class);
-
- private final EsExternalCatalog catalog;
- private final String dbName;
private EsTable esTable;
/**
@@ -50,23 +50,57 @@ public class EsExternalTable extends ExternalTable {
* @param catalog HMSExternalDataSource.
*/
public EsExternalTable(long id, String name, String dbName,
EsExternalCatalog catalog) {
- super(id, name);
- this.dbName = dbName;
- this.catalog = catalog;
- this.type = TableType.ES_EXTERNAL_TABLE;
+ super(id, name, catalog, dbName, TableType.ES_EXTERNAL_TABLE);
}
- private synchronized void makeSureInitialized() {
+ public synchronized void makeSureInitialized() {
if (!initialized) {
- init();
- initialized = true;
+ if (!Env.getCurrentEnv().isMaster()) {
+ fullSchema = null;
+ // Forward to master and wait the journal to replay.
+ MasterCatalogExecutor remoteExecutor = new
MasterCatalogExecutor();
+ try {
+ remoteExecutor.forward(catalog.getId(),
catalog.getDbNullable(dbName).getId(), id);
+ } catch (Exception e) {
+ LOG.warn("Failed to forward init table {} operation to
master. {}", name, e.getMessage());
+ }
+ } else {
+ init();
+ }
+ }
+ if (!objectCreated) {
+ esTable = toEsTable();
+ objectCreated = true;
}
}
private void init() {
- fullSchema = EsUtil.genColumnsFromEs(catalog.getEsRestClient(), name,
null);
- esTable = toEsTable();
+ boolean schemaChanged = false;
+ List<Column> tmpSchema = EsUtil.genColumnsFromEs(
+ ((EsExternalCatalog) catalog).getEsRestClient(), name, null);
+ if (fullSchema == null || fullSchema.size() != tmpSchema.size()) {
+ schemaChanged = true;
+ } else {
+ for (int i = 0; i < fullSchema.size(); i++) {
+ if (!fullSchema.get(i).equals(tmpSchema.get(i))) {
+ schemaChanged = true;
+ break;
+ }
+ }
+ }
+ if (schemaChanged) {
+ timestamp = System.currentTimeMillis();
+ fullSchema = tmpSchema;
+ esTable = toEsTable();
+ }
+ initialized = true;
+ InitTableLog initTableLog = new InitTableLog();
+ initTableLog.setCatalogId(catalog.getId());
+ initTableLog.setDbId(catalog.getDbNameToId().get(dbName));
+ initTableLog.setTableId(id);
+ initTableLog.setSchema(fullSchema);
+ Env.getCurrentEnv().getEditLog().logInitExternalTable(initTableLog);
}
@Override
@@ -107,7 +141,7 @@ public class EsExternalTable extends ExternalTable {
}
/**
- * get database name of hms table.
+ * get database name of es table.
*/
public String getDbName() {
return dbName;
@@ -123,17 +157,18 @@ public class EsExternalTable extends ExternalTable {
}
private EsTable toEsTable() {
+ EsExternalCatalog esCatalog = (EsExternalCatalog) catalog;
EsTable esTable = new EsTable(this.id, this.name, this.fullSchema,
TableType.ES_EXTERNAL_TABLE);
esTable.setIndexName(name);
- esTable.setClient(catalog.getEsRestClient());
- esTable.setUserName(catalog.getUsername());
- esTable.setPasswd(catalog.getPassword());
- esTable.setEnableDocValueScan(catalog.isEnableDocValueScan());
- esTable.setEnableKeywordSniff(catalog.isEnableKeywordSniff());
- esTable.setNodesDiscovery(catalog.isEnableNodesDiscovery());
- esTable.setHttpSslEnabled(catalog.isEnableSsl());
- esTable.setSeeds(catalog.getNodes());
- esTable.setHosts(String.join(",", catalog.getNodes()));
+ esTable.setClient(esCatalog.getEsRestClient());
+ esTable.setUserName(esCatalog.getUsername());
+ esTable.setPasswd(esCatalog.getPassword());
+ esTable.setEnableDocValueScan(esCatalog.isEnableDocValueScan());
+ esTable.setEnableKeywordSniff(esCatalog.isEnableKeywordSniff());
+ esTable.setNodesDiscovery(esCatalog.isEnableNodesDiscovery());
+ esTable.setHttpSslEnabled(esCatalog.isEnableSsl());
+ esTable.setSeeds(esCatalog.getNodes());
+ esTable.setHosts(String.join(",", esCatalog.getNodes()));
esTable.syncTableMetaData();
return esTable;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
index d76ed380a7..c5fa7b9875 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
@@ -21,13 +21,22 @@ import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.DatabaseProperty;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.InitDatabaseLog;
+import org.apache.doris.persist.gson.GsonPostProcessable;
+import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
+import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang.NotImplementedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -38,17 +47,28 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*
* @param <T> External table type is ExternalTable or its subclass.
*/
-public class ExternalDatabase<T extends ExternalTable> implements
DatabaseIf<T> {
+public class ExternalDatabase<T extends ExternalTable> implements
DatabaseIf<T>, Writable, GsonPostProcessable {
private static final Logger LOG =
LogManager.getLogger(ExternalDatabase.class);
- private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);
+ protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);
+ @SerializedName(value = "id")
protected long id;
+ @SerializedName(value = "name")
protected String name;
- protected ExternalCatalog extCatalog;
- protected DatabaseProperty dbProperties;
+ @SerializedName(value = "dbProperties")
+ protected DatabaseProperty dbProperties = new DatabaseProperty();
+ @SerializedName(value = "initialized")
protected boolean initialized = false;
+ protected ExternalCatalog extCatalog;
+
+ /**
+ * No args constructor for persist.
+ */
+ public ExternalDatabase() {
+ initialized = false;
+ }
/**
* Create external database.
@@ -63,10 +83,30 @@ public class ExternalDatabase<T extends ExternalTable>
implements DatabaseIf<T>
this.name = name;
}
- public synchronized void setUnInitialized() {
+ public void setExtCatalog(ExternalCatalog extCatalog) {
+ this.extCatalog = extCatalog;
+ }
+
+ public void setTableExtCatalog(ExternalCatalog extCatalog) {}
+
+ public void setUnInitialized() {
this.initialized = false;
}
+ public boolean isInitialized() {
+ return initialized;
+ }
+
+ public void makeSureInitialized() {}
+
+ public T getTableForReplay(long tableId) {
+ throw new NotImplementedException();
+ }
+
+ public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
+ throw new NotImplementedException();
+ }
+
@Override
public void readLock() {
this.rwLock.readLock().lock();
@@ -177,4 +217,17 @@ public class ExternalDatabase<T extends ExternalTable>
implements DatabaseIf<T>
public T getTableNullable(long tableId) {
throw new NotImplementedException();
}
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, GsonUtils.GSON.toJson(this));
+ }
+
+ public static ExternalDatabase read(DataInput in) throws IOException {
+ String json = Text.readString(in);
+ return GsonUtils.GSON.fromJson(json, ExternalDatabase.class);
+ }
+
+ @Override
+ public void gsonPostProcess() throws IOException {}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
index 33acbc0ec7..f713de7419 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
@@ -22,12 +22,22 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.persist.gson.GsonPostProcessable;
+import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TTableDescriptor;
+import com.google.gson.annotations.SerializedName;
+import lombok.Getter;
import org.apache.commons.lang.NotImplementedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -36,26 +46,36 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* External table represent tables that are not self-managed by Doris.
* Such as tables from hive, iceberg, es, etc.
*/
-public class ExternalTable implements TableIf {
-
+@Getter
+public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
private static final Logger LOG =
LogManager.getLogger(ExternalTable.class);
+ @SerializedName(value = "id")
protected long id;
+ @SerializedName(value = "name")
protected String name;
- protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);
+ @SerializedName(value = "type")
protected TableType type = null;
+ @SerializedName(value = "fullSchema")
protected volatile List<Column> fullSchema = null;
+ @SerializedName(value = "initialized")
protected boolean initialized = false;
+ @SerializedName(value = "timestamp")
+ protected long timestamp;
+
+ protected ExternalCatalog catalog;
+ @SerializedName(value = "dbName")
+ protected String dbName;
+ protected boolean objectCreated = false;
+ protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);
/**
- * Create external table.
- *
- * @param id Table id.
- * @param name Table name.
+ * No args constructor for persist.
*/
- public ExternalTable(long id, String name) {
- this.id = id;
- this.name = name;
+ public ExternalTable() {
+ this.initialized = false;
+ this.objectCreated = false;
+ this.fullSchema = null;
}
/**
@@ -63,23 +83,40 @@ public class ExternalTable implements TableIf {
*
* @param id Table id.
* @param name Table name.
+ * @param catalog ExternalCatalog this table belongs to.
+ * @param dbName Name of the db the this table belongs to.
* @param type Table type.
*/
- public ExternalTable(long id, String name, TableType type) {
+ public ExternalTable(long id, String name, ExternalCatalog catalog, String
dbName, TableType type) {
this.id = id;
this.name = name;
+ this.catalog = catalog;
+ this.dbName = dbName;
this.type = type;
+ this.initialized = false;
+ this.objectCreated = false;
+ this.fullSchema = null;
+ }
+
+ public void setCatalog(ExternalCatalog catalog) {
+ this.catalog = catalog;
}
public boolean isView() {
return false;
}
- public synchronized void setUnInitialized() {
+ public void setUnInitialized() {
this.initialized = false;
- this.fullSchema = null;
}
+ public void replayInitTable(List<Column> schema) {
+ fullSchema = schema;
+ initialized = true;
+ }
+
+ public void makeSureInitialized() {}
+
@Override
public void readLock() {
this.rwLock.readLock().lock();
@@ -260,10 +297,26 @@ public class ExternalTable implements TableIf {
@Override
public String getComment(boolean escapeQuota) {
return "";
-
}
public TTableDescriptor toThrift() {
return null;
}
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ String json = GsonUtils.GSON.toJson(this);
+ Text.writeString(out, json);
+ }
+
+ public static ExternalTable read(DataInput in) throws IOException {
+ String json = Text.readString(in);
+ return GsonUtils.GSON.fromJson(json, ExternalTable.class);
+ }
+
+ @Override
+ public void gsonPostProcess() throws IOException {
+ rwLock = new ReentrantReadWriteLock(true);
+ objectCreated = false;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
index b8d9287982..dc79a679f6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
@@ -21,28 +21,35 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.HMSExternalCatalog;
+import org.apache.doris.datasource.InitDatabaseLog;
+import org.apache.doris.persist.gson.GsonPostProcessable;
+import org.apache.doris.qe.MasterCatalogExecutor;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
/**
* Hive metastore external database.
*/
-public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> {
+public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable>
implements GsonPostProcessable {
private static final Logger LOG =
LogManager.getLogger(HMSExternalDatabase.class);
// Cache of table name to table id.
private Map<String, Long> tableNameToId = Maps.newConcurrentMap();
- private Map<Long, HMSExternalTable> idToTbl = Maps.newHashMap();
+ @SerializedName(value = "idToTbl")
+ private Map<Long, HMSExternalTable> idToTbl = Maps.newConcurrentMap();
/**
* Create HMS external database.
@@ -55,14 +62,53 @@ public class HMSExternalDatabase extends
ExternalDatabase<HMSExternalTable> {
super(extCatalog, id, name);
}
- private synchronized void makeSureInitialized() {
+ public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
+ Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
+ Map<Long, HMSExternalTable> tmpIdToTbl = Maps.newConcurrentMap();
+ for (int i = 0; i < log.getRefreshCount(); i++) {
+ HMSExternalTable table =
getTableForReplay(log.getRefreshTableIds().get(i));
+ table.setUnInitialized();
+ tmpTableNameToId.put(table.getName(), table.getId());
+ tmpIdToTbl.put(table.getId(), table);
+ }
+ for (int i = 0; i < log.getCreateCount(); i++) {
+ HMSExternalTable table = new
HMSExternalTable(log.getCreateTableIds().get(i),
+ log.getCreateTableNames().get(i), name,
(HMSExternalCatalog) catalog);
+ tmpTableNameToId.put(table.getName(), table.getId());
+ tmpIdToTbl.put(table.getId(), table);
+ }
+ tableNameToId = tmpTableNameToId;
+ idToTbl = tmpIdToTbl;
+ initialized = true;
+ }
+
+ public void setTableExtCatalog(ExternalCatalog extCatalog) {
+ for (HMSExternalTable table : idToTbl.values()) {
+ table.setCatalog(extCatalog);
+ }
+ }
+
+ public synchronized void makeSureInitialized() {
if (!initialized) {
+ if (!Env.getCurrentEnv().isMaster()) {
+ // Forward to master and wait the journal to replay.
+ MasterCatalogExecutor remoteExecutor = new
MasterCatalogExecutor();
+ try {
+ remoteExecutor.forward(extCatalog.getId(), id, -1);
+ } catch (Exception e) {
+ LOG.warn("Failed to forward init db {} operation to
master. {}", name, e.getMessage());
+ }
+ return;
+ }
init();
- initialized = true;
}
}
private void init() {
+ InitDatabaseLog initDatabaseLog = new InitDatabaseLog();
+ initDatabaseLog.setType(InitDatabaseLog.Type.HMS);
+ initDatabaseLog.setCatalogId(extCatalog.getId());
+ initDatabaseLog.setDbId(id);
List<String> tableNames = extCatalog.listTableNames(null, name);
if (tableNames != null) {
Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
@@ -75,16 +121,21 @@ public class HMSExternalDatabase extends
ExternalDatabase<HMSExternalTable> {
HMSExternalTable table = idToTbl.get(tblId);
table.setUnInitialized();
tmpIdToTbl.put(tblId, table);
+ initDatabaseLog.addRefreshTable(tblId);
} else {
tblId = Env.getCurrentEnv().getNextId();
tmpTableNameToId.put(tableName, tblId);
- tmpIdToTbl.put(tblId,
- new HMSExternalTable(tblId, tableName, name,
(HMSExternalCatalog) extCatalog));
+ HMSExternalTable table = new HMSExternalTable(tblId,
tableName, name,
+ (HMSExternalCatalog) extCatalog);
+ tmpIdToTbl.put(tblId, table);
+ initDatabaseLog.addCreateTable(tblId, tableName);
}
}
tableNameToId = tmpTableNameToId;
idToTbl = tmpIdToTbl;
}
+ initialized = true;
+ Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog);
}
@Override
@@ -119,4 +170,22 @@ public class HMSExternalDatabase extends
ExternalDatabase<HMSExternalTable> {
makeSureInitialized();
return idToTbl.get(tableId);
}
+
+ public HMSExternalTable getTableForReplay(long tableId) {
+ return idToTbl.get(tableId);
+ }
+
+ @Override
+ public void gsonPostProcess() throws IOException {
+ tableNameToId = Maps.newConcurrentMap();
+ for (HMSExternalTable tbl : idToTbl.values()) {
+ tableNameToId.put(tbl.getName(), tbl.getId());
+ }
+ rwLock = new ReentrantReadWriteLock(true);
+ }
+
+ public void addTableForTest(HMSExternalTable tbl) {
+ idToTbl.put(tbl.getId(), tbl);
+ tableNameToId.put(tbl.getName(), tbl.getId());
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index cbe84744ea..9d27cf0fb6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -18,10 +18,13 @@
package org.apache.doris.catalog.external;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.datasource.HMSExternalCatalog;
+import org.apache.doris.datasource.InitTableLog;
+import org.apache.doris.qe.MasterCatalogExecutor;
import org.apache.doris.thrift.THiveTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
@@ -39,12 +42,9 @@ import java.util.Map;
* Hive metastore external table.
*/
public class HMSExternalTable extends ExternalTable {
-
private static final Logger LOG =
LogManager.getLogger(HMSExternalTable.class);
- private final HMSExternalCatalog catalog;
- private final String dbName;
- private final List<String> supportedHiveFileFormats = Lists.newArrayList(
+ private List<String> supportedHiveFileFormats = Lists.newArrayList(
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
"org.apache.hadoop.mapred.TextInputFormat");
@@ -65,10 +65,7 @@ public class HMSExternalTable extends ExternalTable {
* @param catalog HMSExternalCatalog.
*/
public HMSExternalTable(long id, String name, String dbName,
HMSExternalCatalog catalog) {
- super(id, name);
- this.dbName = dbName;
- this.catalog = catalog;
- this.type = TableType.HMS_EXTERNAL_TABLE;
+ super(id, name, catalog, dbName, TableType.HMS_EXTERNAL_TABLE);
}
public boolean isSupportedHmsTable() {
@@ -76,34 +73,45 @@ public class HMSExternalTable extends ExternalTable {
return dlaType != DLAType.UNKNOWN;
}
- private synchronized void makeSureInitialized() {
- if (!initialized) {
- init();
- initialized = true;
- }
- }
-
- private void init() {
- try {
- getRemoteTable();
- } catch (MetaNotFoundException e) {
- // CHECKSTYLE IGNORE THIS LINE
- }
- if (remoteTable == null) {
- dlaType = DLAType.UNKNOWN;
- fullSchema = Lists.newArrayList();
- } else {
- if (supportedIcebergTable()) {
- dlaType = DLAType.ICEBERG;
- } else if (supportedHoodieTable()) {
- dlaType = DLAType.HUDI;
- } else if (supportedHiveTable()) {
- dlaType = DLAType.HIVE;
- } else {
+ public synchronized void makeSureInitialized() {
+ if (!objectCreated) {
+ try {
+ getRemoteTable();
+ } catch (MetaNotFoundException e) {
+ // CHECKSTYLE IGNORE THIS LINE
+ }
+ supportedHiveFileFormats = Lists.newArrayList(
+
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
+ "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
+ "org.apache.hadoop.mapred.TextInputFormat");
+ if (remoteTable == null) {
dlaType = DLAType.UNKNOWN;
- fullSchema = Lists.newArrayList();
+ } else {
+ if (supportedIcebergTable()) {
+ dlaType = DLAType.ICEBERG;
+ } else if (supportedHoodieTable()) {
+ dlaType = DLAType.HUDI;
+ } else if (supportedHiveTable()) {
+ dlaType = DLAType.HIVE;
+ } else {
+ dlaType = DLAType.UNKNOWN;
+ }
}
- initSchema();
+ objectCreated = true;
+ }
+ if (!initialized) {
+ if (!Env.getCurrentEnv().isMaster()) {
+ fullSchema = null;
+ // Forward to master and wait the journal to replay.
+ MasterCatalogExecutor remoteExecutor = new
MasterCatalogExecutor();
+ try {
+ remoteExecutor.forward(catalog.getId(),
catalog.getDbNullable(dbName).getId(), id);
+ } catch (Exception e) {
+ LOG.warn("Failed to forward init table {} operation to
master. {}", name, e.getMessage());
+ }
+ return;
+ }
+ init();
}
}
@@ -150,24 +158,45 @@ public class HMSExternalTable extends ExternalTable {
return isManagedTable && supportedFileFormat;
}
- private void initSchema() {
- if (fullSchema == null) {
- synchronized (this) {
- if (fullSchema == null) {
- fullSchema = Lists.newArrayList();
- try {
- for (FieldSchema field :
HiveMetaStoreClientHelper.getSchema(dbName, name,
- catalog.getHiveMetastoreUris())) {
- fullSchema.add(new Column(field.getName(),
-
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
true,
- null, field.getComment()));
- }
- } catch (DdlException e) {
- LOG.warn("Fail to get schema of hms table {}", name,
e);
+ private void init() {
+ boolean schemaChanged = false;
+ List<Column> tmpSchema = Lists.newArrayList();
+ if (dlaType.equals(DLAType.UNKNOWN)) {
+ schemaChanged = true;
+ } else {
+ try {
+ for (FieldSchema field :
HiveMetaStoreClientHelper.getSchema(dbName, name,
+ ((HMSExternalCatalog)
catalog).getHiveMetastoreUris())) {
+ int columnId = (int) Env.getCurrentEnv().getNextId();
+ tmpSchema.add(new Column(field.getName(),
+
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
+ true, null, field.getComment(), true, null,
columnId));
+ }
+ } catch (DdlException e) {
+ LOG.warn("Fail to get schema of hms table {}", name, e);
+ }
+ if (fullSchema == null || fullSchema.size() != tmpSchema.size()) {
+ schemaChanged = true;
+ } else {
+ for (int i = 0; i < fullSchema.size(); i++) {
+ if (!fullSchema.get(i).equals(tmpSchema.get(i))) {
+ schemaChanged = true;
+ break;
}
}
}
}
+ if (schemaChanged) {
+ timestamp = System.currentTimeMillis();
+ fullSchema = tmpSchema;
+ }
+ initialized = true;
+ InitTableLog initTableLog = new InitTableLog();
+ initTableLog.setCatalogId(catalog.getId());
+ initTableLog.setDbId(catalog.getDbNameToId().get(dbName));
+ initTableLog.setTableId(id);
+ initTableLog.setSchema(fullSchema);
+ Env.getCurrentEnv().getEditLog().logInitExternalTable(initTableLog);
}
/**
@@ -177,11 +206,11 @@ public class HMSExternalTable extends ExternalTable {
if (remoteTable == null) {
synchronized (this) {
if (remoteTable == null) {
+ String uri = ((HMSExternalCatalog)
catalog).getHiveMetastoreUris();
try {
- remoteTable =
HiveMetaStoreClientHelper.getTable(dbName, name,
catalog.getHiveMetastoreUris());
+ remoteTable =
HiveMetaStoreClientHelper.getTable(dbName, name, uri);
} catch (DdlException e) {
- LOG.warn("Fail to get remote hive table. db {}, table
{}, uri {}", dbName, name,
- catalog.getHiveMetastoreUris());
+ LOG.warn("Fail to get remote hive table. db {}, table
{}, uri {}", dbName, name, uri);
throw new MetaNotFoundException(e);
}
}
@@ -300,7 +329,7 @@ public class HMSExternalTable extends ExternalTable {
}
public String getMetastoreUri() {
- return catalog.getHiveMetastoreUris();
+ return ((HMSExternalCatalog) catalog).getHiveMetastoreUris();
}
public Map<String, String> getDfsProperties() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
index 5fd6079d5c..b92a515751 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
@@ -25,6 +25,8 @@ import org.apache.doris.analysis.RefreshCatalogStmt;
import org.apache.doris.analysis.ShowCatalogStmt;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.external.ExternalDatabase;
+import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
@@ -39,6 +41,7 @@ import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSet;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
@@ -406,6 +409,53 @@ public class CatalogMgr implements Writable,
GsonPostProcessable {
}
}
+ public void replayInitCatalog(InitCatalogLog log) {
+ ExternalCatalog catalog = (ExternalCatalog)
idToCatalog.get(log.getCatalogId());
+ Preconditions.checkArgument(catalog != null);
+ catalog.replayInitCatalog(log);
+ }
+
+ public void replayInitExternalDb(InitDatabaseLog log) {
+ ExternalCatalog catalog = (ExternalCatalog)
idToCatalog.get(log.getCatalogId());
+ Preconditions.checkArgument(catalog != null);
+ ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+ Preconditions.checkArgument(db != null);
+ db.replayInitDb(log, catalog);
+ }
+
+ public void replayInitExternalTable(InitTableLog log) {
+ ExternalCatalog catalog = (ExternalCatalog)
idToCatalog.get(log.getCatalogId());
+ Preconditions.checkArgument(catalog != null);
+ ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+ Preconditions.checkArgument(db != null);
+ ExternalTable table = db.getTableForReplay(log.getTableId());
+ Preconditions.checkArgument(table != null);
+ table.replayInitTable(log.getSchema());
+ }
+
+ public void replayRefreshExternalDb(ExternalObjectLog log) {
+ writeLock();
+ try {
+ ExternalCatalog catalog = (ExternalCatalog)
idToCatalog.get(log.getCatalogId());
+ ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+ db.setUnInitialized();
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ public void replayRefreshExternalTable(ExternalObjectLog log) {
+ writeLock();
+ try {
+ ExternalCatalog catalog = (ExternalCatalog)
idToCatalog.get(log.getCatalogId());
+ ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+ ExternalTable table = db.getTableForReplay(log.getTableId());
+ table.setUnInitialized();
+ } finally {
+ writeUnlock();
+ }
+ }
+
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java
index baa2c6e169..180d5730bf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java
@@ -25,6 +25,7 @@ import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.DdlException;
import org.apache.doris.external.elasticsearch.EsRestClient;
import org.apache.doris.external.elasticsearch.EsUtil;
+import org.apache.doris.qe.MasterCatalogExecutor;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -54,10 +55,6 @@ public class EsExternalCatalog extends ExternalCatalog {
private static final String PROP_NODES_DISCOVERY =
"elasticsearch.nodes_discovery";
private static final String PROP_SSL = "elasticsearch.ssl";
- // Cache of db name to db id.
- private Map<String, Long> dbNameToId;
- private Map<Long, EsExternalDatabase> idToDb;
-
private EsRestClient esRestClient;
private String[] nodes;
@@ -138,29 +135,56 @@ public class EsExternalCatalog extends ExternalCatalog {
* Datasource can't be init when creating because the external datasource
may depend on third system.
* So you have to make sure the client of third system is initialized
before any method was called.
*/
- private synchronized void makeSureInitialized() {
+ @Override
+ public synchronized void makeSureInitialized() {
+ if (!objectCreated) {
+ try {
+ validate(catalogProperty.getProperties());
+ } catch (DdlException e) {
+ LOG.warn("validate error", e);
+ }
+ esRestClient = new EsRestClient(this.nodes, this.username,
this.password, this.enableSsl);
+ objectCreated = true;
+ }
if (!initialized) {
+ if (!Env.getCurrentEnv().isMaster()) {
+ // Forward to master and wait the journal to replay.
+ MasterCatalogExecutor remoteExecutor = new
MasterCatalogExecutor();
+ try {
+ remoteExecutor.forward(id, -1, -1);
+ } catch (Exception e) {
+ LOG.warn("Failed to forward init catalog {} operation to
master. {}", name, e.getMessage());
+ }
+ return;
+ }
init();
- initialized = true;
}
}
private void init() {
+ InitCatalogLog initCatalogLog = new InitCatalogLog();
try {
validate(this.catalogProperty.getProperties());
} catch (DdlException e) {
LOG.warn("validate error", e);
}
this.esRestClient = new EsRestClient(this.nodes, this.username,
this.password, this.enableSsl);
+ initCatalogLog.setCatalogId(id);
+ initCatalogLog.setType(InitCatalogLog.Type.ES);
if (dbNameToId != null && dbNameToId.containsKey(DEFAULT_DB)) {
idToDb.get(dbNameToId.get(DEFAULT_DB)).setUnInitialized();
+ initCatalogLog.addRefreshDb(dbNameToId.get(DEFAULT_DB));
} else {
dbNameToId = Maps.newConcurrentMap();
idToDb = Maps.newConcurrentMap();
long defaultDbId = Env.getCurrentEnv().getNextId();
dbNameToId.put(DEFAULT_DB, defaultDbId);
- idToDb.put(defaultDbId, new EsExternalDatabase(this, defaultDbId,
DEFAULT_DB));
+ EsExternalDatabase db = new EsExternalDatabase(this, defaultDbId,
DEFAULT_DB);
+ idToDb.put(defaultDbId, db);
+ initCatalogLog.addCreateDb(defaultDbId, DEFAULT_DB);
}
+ initialized = true;
+ Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog);
}
@Override
@@ -185,6 +209,13 @@ public class EsExternalCatalog extends ExternalCatalog {
return idToDb.get(dbNameToId.get(realDbName));
}
+ @Nullable
+ @Override
+ public ExternalDatabase getDbNullable(long dbId) {
+ makeSureInitialized();
+ return idToDb.get(dbId);
+ }
+
@Override
public boolean tableExist(SessionContext ctx, String dbName, String
tblName) {
return esRestClient.existIndex(this.esRestClient.getClient(), tblName);
@@ -194,4 +225,8 @@ public class EsExternalCatalog extends ExternalCatalog {
public List<Long> getDbIds() {
return Lists.newArrayList(dbNameToId.values());
}
+
+ public ExternalDatabase getDbForReplay(long dbId) {
+ return idToDb.get(dbId);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index a89e53825e..e6e9609ab3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -17,15 +17,21 @@
package org.apache.doris.datasource;
+import org.apache.doris.catalog.external.EsExternalDatabase;
import org.apache.doris.catalog.external.ExternalDatabase;
+import org.apache.doris.catalog.external.HMSExternalDatabase;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
+import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import org.apache.commons.lang.NotImplementedException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
import java.io.DataInput;
@@ -38,7 +44,9 @@ import java.util.Map;
* The abstract class for all types of external catalogs.
*/
@Data
-public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>,
Writable {
+public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>,
Writable, GsonPostProcessable {
+ private static final Logger LOG =
LogManager.getLogger(ExternalCatalog.class);
+
// Unique id of this catalog, will be assigned after catalog is loaded.
@SerializedName(value = "id")
protected long id;
@@ -49,8 +57,16 @@ public abstract class ExternalCatalog implements
CatalogIf<ExternalDatabase>, Wr
// save properties of this catalog, such as hive meta store url.
@SerializedName(value = "catalogProperty")
protected CatalogProperty catalogProperty = new CatalogProperty();
+ @SerializedName(value = "initialized")
protected boolean initialized = false;
+ // Cache of db name to db id
+ @SerializedName(value = "idToDb")
+ protected Map<Long, ExternalDatabase> idToDb = Maps.newConcurrentMap();
+ // db name does not contains "default_cluster"
+ protected Map<String, Long> dbNameToId = Maps.newConcurrentMap();
+ protected boolean objectCreated = false;
+
/**
* @return names of database in this catalog.
*/
@@ -71,6 +87,16 @@ public abstract class ExternalCatalog implements
CatalogIf<ExternalDatabase>, Wr
*/
public abstract boolean tableExist(SessionContext ctx, String dbName,
String tblName);
+ public abstract void makeSureInitialized();
+
+ public void setInitialized(boolean initialized) {
+ this.initialized = initialized;
+ }
+
+ public ExternalDatabase getDbForReplay(long dbId) {
+ throw new NotImplementedException();
+ }
+
@Override
public long getId() {
return id;
@@ -123,6 +149,40 @@ public abstract class ExternalCatalog implements
CatalogIf<ExternalDatabase>, Wr
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
+ public void replayInitCatalog(InitCatalogLog log) {
+ Map<String, Long> tmpDbNameToId = Maps.newConcurrentMap();
+ Map<Long, ExternalDatabase> tmpIdToDb = Maps.newConcurrentMap();
+ for (int i = 0; i < log.getRefreshCount(); i++) {
+ ExternalDatabase db = getDbForReplay(log.getRefreshDbIds().get(i));
+ db.setUnInitialized();
+ tmpDbNameToId.put(db.getFullName(), db.getId());
+ tmpIdToDb.put(db.getId(), db);
+ }
+ switch (log.getType()) {
+ case HMS:
+ for (int i = 0; i < log.getCreateCount(); i++) {
+ HMSExternalDatabase db = new HMSExternalDatabase(
+ this, log.getCreateDbIds().get(i),
log.getCreateDbNames().get(i));
+ tmpDbNameToId.put(db.getFullName(), db.getId());
+ tmpIdToDb.put(db.getId(), db);
+ }
+ break;
+ case ES:
+ for (int i = 0; i < log.getCreateCount(); i++) {
+ EsExternalDatabase db = new EsExternalDatabase(
+ this, log.getCreateDbIds().get(i),
log.getCreateDbNames().get(i));
+ tmpDbNameToId.put(db.getFullName(), db.getId());
+ tmpIdToDb.put(db.getId(), db);
+ }
+ break;
+ default:
+ break;
+ }
+ dbNameToId = tmpDbNameToId;
+ idToDb = tmpIdToDb;
+ initialized = true;
+ }
+
/**
* External catalog has no cluster semantics.
*/
@@ -134,4 +194,20 @@ public abstract class ExternalCatalog implements
CatalogIf<ExternalDatabase>, Wr
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, ExternalCatalog.class);
}
+
+ @Override
+ public void gsonPostProcess() throws IOException {
+ dbNameToId = Maps.newConcurrentMap();
+ for (ExternalDatabase db : idToDb.values()) {
+
dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()),
db.getId());
+ db.setExtCatalog(this);
+ db.setTableExtCatalog(this);
+ }
+ objectCreated = false;
+ }
+
+ public void addDatabaseForTest(ExternalDatabase db) {
+ idToDb.put(db.getId(), db);
+ dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()),
db.getId());
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
new file mode 100644
index 0000000000..cff446657e
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+import lombok.Data;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+@NoArgsConstructor
+@Getter
+@Data
+public class ExternalObjectLog implements Writable {
+ @SerializedName(value = "catalogId")
+ private long catalogId;
+
+ @SerializedName(value = "dbId")
+ private long dbId;
+
+ @SerializedName(value = "tableId")
+ private long tableId;
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, GsonUtils.GSON.toJson(this));
+ }
+
+ public static ExternalObjectLog read(DataInput in) throws IOException {
+ String json = Text.readString(in);
+ return GsonUtils.GSON.fromJson(json, ExternalObjectLog.class);
+ }
+
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
index 32847844dc..6154ddd4af 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.HMSExternalDatabase;
import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.qe.MasterCatalogExecutor;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -32,6 +33,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.jetbrains.annotations.Nullable;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -41,9 +43,6 @@ import java.util.Map;
public class HMSExternalCatalog extends ExternalCatalog {
private static final Logger LOG =
LogManager.getLogger(HMSExternalCatalog.class);
- // Cache of db name to db id.
- private Map<String, Long> dbNameToId = Maps.newConcurrentMap();
- private Map<Long, HMSExternalDatabase> idToDb = Maps.newConcurrentMap();
protected HiveMetaStoreClient client;
/**
@@ -63,15 +62,10 @@ public class HMSExternalCatalog extends ExternalCatalog {
private void init() {
Map<String, Long> tmpDbNameToId = Maps.newConcurrentMap();
- Map<Long, HMSExternalDatabase> tmpIdToDb = Maps.newConcurrentMap();
- HiveConf hiveConf = new HiveConf();
- hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS,
getHiveMetastoreUris());
- try {
- client = new HiveMetaStoreClient(hiveConf);
- } catch (MetaException e) {
- LOG.warn("Failed to create HiveMetaStoreClient: {}",
e.getMessage());
- return;
- }
+ Map<Long, ExternalDatabase> tmpIdToDb = Maps.newConcurrentMap();
+ InitCatalogLog initCatalogLog = new InitCatalogLog();
+ initCatalogLog.setCatalogId(id);
+ initCatalogLog.setType(InitCatalogLog.Type.HMS);
List<String> allDatabases;
try {
allDatabases = client.getAllDatabases();
@@ -88,27 +82,53 @@ public class HMSExternalCatalog extends ExternalCatalog {
if (dbNameToId != null && dbNameToId.containsKey(dbName)) {
dbId = dbNameToId.get(dbName);
tmpDbNameToId.put(dbName, dbId);
- HMSExternalDatabase db = idToDb.get(dbId);
+ ExternalDatabase db = idToDb.get(dbId);
db.setUnInitialized();
tmpIdToDb.put(dbId, db);
+ initCatalogLog.addRefreshDb(dbId);
} else {
dbId = Env.getCurrentEnv().getNextId();
tmpDbNameToId.put(dbName, dbId);
- tmpIdToDb.put(dbId, new HMSExternalDatabase(this, dbId,
dbName));
+ HMSExternalDatabase db = new HMSExternalDatabase(this, dbId,
dbName);
+ tmpIdToDb.put(dbId, db);
+ initCatalogLog.addCreateDb(dbId, dbName);
}
}
dbNameToId = tmpDbNameToId;
idToDb = tmpIdToDb;
+ initialized = true;
+ Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog);
}
/**
* Catalog can't be init when creating because the external catalog may
depend on third system.
* So you have to make sure the client of third system is initialized
before any method was called.
*/
- private synchronized void makeSureInitialized() {
+ @Override
+ public synchronized void makeSureInitialized() {
+ if (!objectCreated) {
+ try {
+ HiveConf hiveConf = new HiveConf();
+ hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS,
getHiveMetastoreUris());
+ client = new HiveMetaStoreClient(hiveConf);
+ objectCreated = true;
+ } catch (MetaException e) {
+ LOG.warn("Failed to create HiveMetaStoreClient: {}",
e.getMessage());
+ return;
+ }
+ }
if (!initialized) {
+ if (!Env.getCurrentEnv().isMaster()) {
+ // Forward to master and wait the journal to replay.
+ MasterCatalogExecutor remoteExecutor = new
MasterCatalogExecutor();
+ try {
+ remoteExecutor.forward(id, -1, -1);
+ } catch (Exception e) {
+ LOG.warn("Failed to forward init catalog {} operation to
master. {}", name, e.getMessage());
+ }
+ return;
+ }
init();
- initialized = true;
}
}
@@ -120,10 +140,18 @@ public class HMSExternalCatalog extends ExternalCatalog {
@Override
public List<String> listTableNames(SessionContext ctx, String dbName) {
- try {
- return client.getAllTables(getRealTableName(dbName));
- } catch (MetaException e) {
- LOG.warn("List Table Names failed. {}", e.getMessage());
+ makeSureInitialized();
+ HMSExternalDatabase hmsExternalDatabase = (HMSExternalDatabase)
idToDb.get(dbNameToId.get(dbName));
+ if (hmsExternalDatabase != null &&
hmsExternalDatabase.isInitialized()) {
+ ArrayList<String> names = Lists.newArrayList();
+ hmsExternalDatabase.getTables().stream().forEach(table ->
names.add(table.getName()));
+ return names;
+ } else {
+ try {
+ return client.getAllTables(getRealTableName(dbName));
+ } catch (MetaException e) {
+ LOG.warn("List Table Names failed. {}", e.getMessage());
+ }
}
return Lists.newArrayList();
}
@@ -161,4 +189,8 @@ public class HMSExternalCatalog extends ExternalCatalog {
makeSureInitialized();
return Lists.newArrayList(dbNameToId.values());
}
+
+ public ExternalDatabase getDbForReplay(long dbId) {
+ return idToDb.get(dbId);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
new file mode 100644
index 0000000000..48deec84e6
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.common.collect.Lists;
+import com.google.gson.annotations.SerializedName;
+import lombok.Data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+@Data
+public class InitCatalogLog implements Writable {
+ enum Type {
+ HMS,
+ ES,
+ UNKNOWN;
+ }
+
+ @SerializedName(value = "catalogId")
+ private long catalogId;
+
+ @SerializedName(value = "refreshCount")
+ private int refreshCount;
+
+ @SerializedName(value = "createCount")
+ private int createCount;
+
+ @SerializedName(value = "refreshDbIds")
+ private List<Long> refreshDbIds;
+
+ @SerializedName(value = "createDbIds")
+ private List<Long> createDbIds;
+
+ @SerializedName(value = "createDbNames")
+ private List<String> createDbNames;
+
+ @SerializedName(value = "type")
+ private Type type;
+
+ public InitCatalogLog() {
+ refreshCount = 0;
+ createCount = 0;
+ catalogId = 0;
+ refreshDbIds = Lists.newArrayList();
+ createDbIds = Lists.newArrayList();
+ createDbNames = Lists.newArrayList();
+ type = Type.UNKNOWN;
+ }
+
+ public void addRefreshDb(long id) {
+ refreshCount += 1;
+ refreshDbIds.add(id);
+ }
+
+ public void addCreateDb(long id, String name) {
+ createCount += 1;
+ createDbIds.add(id);
+ createDbNames.add(name);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, GsonUtils.GSON.toJson(this));
+ }
+
+ public static InitCatalogLog read(DataInput in) throws IOException {
+ String json = Text.readString(in);
+ return GsonUtils.GSON.fromJson(json, InitCatalogLog.class);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java
new file mode 100644
index 0000000000..eade3d12b8
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.common.collect.Lists;
+import com.google.gson.annotations.SerializedName;
+import lombok.Data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+@Data
+public class InitDatabaseLog implements Writable {
+ public enum Type {
+ HMS,
+ ES,
+ UNKNOWN;
+ }
+
+ @SerializedName(value = "catalogId")
+ private long catalogId;
+
+ @SerializedName(value = "dbId")
+ private long dbId;
+
+ @SerializedName(value = "refreshCount")
+ private int refreshCount;
+
+ @SerializedName(value = "createCount")
+ private int createCount;
+
+ @SerializedName(value = "refreshTableIds")
+ private List<Long> refreshTableIds;
+
+ @SerializedName(value = "createTableIds")
+ private List<Long> createTableIds;
+
+ @SerializedName(value = "createTableNames")
+ private List<String> createTableNames;
+
+ @SerializedName(value = "type")
+ private Type type;
+
+ public InitDatabaseLog() {
+ refreshCount = 0;
+ createCount = 0;
+ catalogId = 0;
+ dbId = 0;
+ refreshTableIds = Lists.newArrayList();
+ createTableIds = Lists.newArrayList();
+ createTableNames = Lists.newArrayList();
+ type = Type.UNKNOWN;
+ }
+
+ public void addRefreshTable(long id) {
+ refreshCount += 1;
+ refreshTableIds.add(id);
+ }
+
+ public void addCreateTable(long id, String name) {
+ createCount += 1;
+ createTableIds.add(id);
+ createTableNames.add(name);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, GsonUtils.GSON.toJson(this));
+ }
+
+ public static InitDatabaseLog read(DataInput in) throws IOException {
+ String json = Text.readString(in);
+ return GsonUtils.GSON.fromJson(json, InitDatabaseLog.class);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitTableLog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitTableLog.java
new file mode 100644
index 0000000000..2f462b551c
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitTableLog.java
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+import lombok.Data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+@Data
+public class InitTableLog implements Writable {
+ enum Type {
+ HMS,
+ ES,
+ UNKNOWN;
+ }
+
+ @SerializedName(value = "catalogId")
+ private long catalogId;
+
+ @SerializedName(value = "dbId")
+ private long dbId;
+
+ @SerializedName(value = "tableId")
+ private long tableId;
+
+ @SerializedName(value = "type")
+ private Type type;
+
+ @SerializedName(value = "schema")
+ protected volatile List<Column> schema;
+
+ public InitTableLog() {}
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, GsonUtils.GSON.toJson(this));
+ }
+
+ public static InitTableLog read(DataInput in) throws IOException {
+ String json = Text.readString(in);
+ return GsonUtils.GSON.fromJson(json, InitTableLog.class);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
index 5e9b25ffc5..cbf827b429 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
@@ -37,6 +37,7 @@ import org.apache.doris.analysis.RangePartitionDesc;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
@@ -370,6 +371,7 @@ public class EsUtil {
column.setName(key);
column.setIsKey(true);
column.setIsAllowNull(true);
+ column.setUniqueId((int) Env.getCurrentEnv().getNextId());
if (arrayFields.contains(key)) {
column.setType(ArrayType.create(type, true));
} else {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 11e8ca03dd..dc48ed62a1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -37,6 +37,10 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.SmallFileMgr.SmallFile;
import org.apache.doris.datasource.CatalogLog;
+import org.apache.doris.datasource.ExternalObjectLog;
+import org.apache.doris.datasource.InitCatalogLog;
+import org.apache.doris.datasource.InitDatabaseLog;
+import org.apache.doris.datasource.InitTableLog;
import org.apache.doris.ha.MasterInfo;
import org.apache.doris.journal.bdbje.Timestamp;
import org.apache.doris.load.DeleteInfo;
@@ -685,6 +689,27 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
+ case OperationType.OP_INIT_CATALOG: {
+ data = InitCatalogLog.read(in);
+ isRead = true;
+ break;
+ }
+ case OperationType.OP_INIT_EXTERNAL_DB: {
+ data = InitDatabaseLog.read(in);
+ isRead = true;
+ break;
+ }
+ case OperationType.OP_INIT_EXTERNAL_TABLE: {
+ data = InitTableLog.read(in);
+ isRead = true;
+ break;
+ }
+ case OperationType.OP_REFRESH_EXTERNAL_DB:
+ case OperationType.OP_REFRESH_EXTERNAL_TABLE: {
+ data = ExternalObjectLog.read(in);
+ isRead = true;
+ break;
+ }
case OperationType.OP_MODIFY_TABLE_ADD_OR_DROP_COLUMNS: {
data = TableAddOrDropColumnsInfo.read(in);
isRead = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 09ec9c24ff..39bd5e7dfd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -42,6 +42,10 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.SmallFileMgr.SmallFile;
import org.apache.doris.datasource.CatalogLog;
+import org.apache.doris.datasource.ExternalObjectLog;
+import org.apache.doris.datasource.InitCatalogLog;
+import org.apache.doris.datasource.InitDatabaseLog;
+import org.apache.doris.datasource.InitTableLog;
import org.apache.doris.ha.MasterInfo;
import org.apache.doris.journal.Journal;
import org.apache.doris.journal.JournalCursor;
@@ -926,6 +930,31 @@ public class EditLog {
env.getAuth().replayAlterUser(log);
break;
}
+ case OperationType.OP_INIT_CATALOG: {
+ final InitCatalogLog log = (InitCatalogLog)
journal.getData();
+ env.getCatalogMgr().replayInitCatalog(log);
+ break;
+ }
+ case OperationType.OP_REFRESH_EXTERNAL_DB: {
+ final ExternalObjectLog log = (ExternalObjectLog)
journal.getData();
+ env.getCatalogMgr().replayRefreshExternalDb(log);
+ break;
+ }
+ case OperationType.OP_INIT_EXTERNAL_DB: {
+ final InitDatabaseLog log = (InitDatabaseLog)
journal.getData();
+ env.getCatalogMgr().replayInitExternalDb(log);
+ break;
+ }
+ case OperationType.OP_REFRESH_EXTERNAL_TABLE: {
+ final ExternalObjectLog log = (ExternalObjectLog)
journal.getData();
+ env.getCatalogMgr().replayRefreshExternalTable(log);
+ break;
+ }
+ case OperationType.OP_INIT_EXTERNAL_TABLE: {
+ final InitTableLog log = (InitTableLog) journal.getData();
+ env.getCatalogMgr().replayInitExternalTable(log);
+ break;
+ }
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
@@ -1581,6 +1610,26 @@ public class EditLog {
logEdit(OperationType.OP_DROP_MTMV_TASK, new DropMTMVTask(taskIds));
}
+ public void logInitCatalog(InitCatalogLog log) {
+ logEdit(OperationType.OP_INIT_CATALOG, log);
+ }
+
+ public void logRefreshExternalDb(ExternalObjectLog log) {
+ logEdit(OperationType.OP_REFRESH_EXTERNAL_DB, log);
+ }
+
+ public void logInitExternalDb(InitDatabaseLog log) {
+ logEdit(OperationType.OP_INIT_EXTERNAL_DB, log);
+ }
+
+ public void logRefreshExternalTable(ExternalObjectLog log) {
+ logEdit(OperationType.OP_REFRESH_EXTERNAL_TABLE, log);
+ }
+
+ public void logInitExternalTable(InitTableLog log) {
+ logEdit(OperationType.OP_INIT_EXTERNAL_TABLE, log);
+ }
+
public Journal getJournal() {
return this.journal;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 3ff1473071..82304acb23 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -239,6 +239,11 @@ public class OperationType {
public static final short OP_ALTER_CATALOG_NAME = 322;
public static final short OP_ALTER_CATALOG_PROPS = 323;
public static final short OP_REFRESH_CATALOG = 324;
+ public static final short OP_INIT_CATALOG = 325;
+ public static final short OP_REFRESH_EXTERNAL_DB = 326;
+ public static final short OP_INIT_EXTERNAL_DB = 327;
+ public static final short OP_REFRESH_EXTERNAL_TABLE = 328;
+ public static final short OP_INIT_EXTERNAL_TABLE = 329;
// scheduler job and task 330-350
public static final short OP_CREATE_MTMV_JOB = 330;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index 3bee6d59ff..30fe5fdcc1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -21,6 +21,7 @@ import org.apache.doris.alter.AlterJobV2;
import org.apache.doris.alter.RollupJobV2;
import org.apache.doris.alter.SchemaChangeJobV2;
import org.apache.doris.catalog.ArrayType;
+import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.JdbcResource;
@@ -32,6 +33,13 @@ import org.apache.doris.catalog.S3Resource;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.SparkResource;
import org.apache.doris.catalog.StructType;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.external.EsExternalDatabase;
+import org.apache.doris.catalog.external.EsExternalTable;
+import org.apache.doris.catalog.external.ExternalDatabase;
+import org.apache.doris.catalog.external.ExternalTable;
+import org.apache.doris.catalog.external.HMSExternalDatabase;
+import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.EsExternalCatalog;
import org.apache.doris.datasource.HMSExternalCatalog;
@@ -140,7 +148,6 @@ public class GsonUtils {
= RuntimeTypeAdapterFactory.of(LoadJobStateUpdateInfo.class,
"clazz")
.registerSubtype(SparkLoadJobStateUpdateInfo.class,
SparkLoadJobStateUpdateInfo.class.getSimpleName());
-
// runtime adapter for class "Policy"
private static RuntimeTypeAdapterFactory<Policy> policyTypeAdapterFactory
= RuntimeTypeAdapterFactory.of(
Policy.class, "clazz").registerSubtype(RowPolicy.class,
RowPolicy.class.getSimpleName())
@@ -152,6 +159,18 @@ public class GsonUtils {
.registerSubtype(HMSExternalCatalog.class,
HMSExternalCatalog.class.getSimpleName())
.registerSubtype(EsExternalCatalog.class,
EsExternalCatalog.class.getSimpleName());
+ private static RuntimeTypeAdapterFactory<DatabaseIf> dbTypeAdapterFactory
= RuntimeTypeAdapterFactory.of(
+ DatabaseIf.class, "clazz")
+ .registerSubtype(ExternalDatabase.class,
ExternalDatabase.class.getSimpleName())
+ .registerSubtype(EsExternalDatabase.class,
EsExternalDatabase.class.getSimpleName())
+ .registerSubtype(HMSExternalDatabase.class,
HMSExternalDatabase.class.getSimpleName());
+
+ private static RuntimeTypeAdapterFactory<TableIf> tblTypeAdapterFactory =
RuntimeTypeAdapterFactory.of(
+ TableIf.class, "clazz")
+ .registerSubtype(ExternalTable.class,
ExternalTable.class.getSimpleName())
+ .registerSubtype(EsExternalTable.class,
EsExternalTable.class.getSimpleName())
+ .registerSubtype(HMSExternalTable.class,
HMSExternalTable.class.getSimpleName());
+
// the builder of GSON instance.
// Add any other adapters if necessary.
private static final GsonBuilder GSON_BUILDER = new
GsonBuilder().addSerializationExclusionStrategy(
@@ -165,7 +184,10 @@ public class GsonUtils {
.registerTypeAdapterFactory(alterJobV2TypeAdapterFactory)
.registerTypeAdapterFactory(syncJobTypeAdapterFactory)
.registerTypeAdapterFactory(loadJobStateUpdateInfoTypeAdapterFactory)
-
.registerTypeAdapterFactory(policyTypeAdapterFactory).registerTypeAdapterFactory(dsTypeAdapterFactory)
+ .registerTypeAdapterFactory(policyTypeAdapterFactory)
+ .registerTypeAdapterFactory(dsTypeAdapterFactory)
+ .registerTypeAdapterFactory(dbTypeAdapterFactory)
+ .registerTypeAdapterFactory(tblTypeAdapterFactory)
.registerTypeAdapter(ImmutableMap.class, new
ImmutableMapDeserializer())
.registerTypeAdapter(AtomicBoolean.class, new
AtomicBooleanAdapter());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterCatalogExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterCatalogExecutor.java
new file mode 100644
index 0000000000..c3a08d6d50
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterCatalogExecutor.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.thrift.FrontendService;
+import org.apache.doris.thrift.TInitExternalCtlMetaRequest;
+import org.apache.doris.thrift.TInitExternalCtlMetaResult;
+import org.apache.doris.thrift.TNetworkAddress;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * The client for Observer FE to forward external datasource object init
request to master.
+ * Including init ExternalCatalog, ExternalDatabase and ExternalTable.
+ * This client will wait for the journal ID replayed at this Observer FE
before return.
+ */
+public class MasterCatalogExecutor {
+
+ private static final Logger LOG =
LogManager.getLogger(MasterCatalogExecutor.class);
+
+ private final ConnectContext ctx;
+ private int waitTimeoutMs;
+
+ public MasterCatalogExecutor() {
+ ctx = ConnectContext.get();
+ waitTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000;
+ }
+
+ public void forward(long catalogId, long dbId, long tableId) throws
Exception {
+ if (!ctx.getEnv().isReady()) {
+ throw new Exception("Current catalog is not ready, please wait for
a while.");
+ }
+ String masterHost = ctx.getEnv().getMasterIp();
+ int masterRpcPort = ctx.getEnv().getMasterRpcPort();
+ TNetworkAddress thriftAddress = new TNetworkAddress(masterHost,
masterRpcPort);
+
+ FrontendService.Client client = null;
+ try {
+ client = ClientPool.frontendPool.borrowObject(thriftAddress,
waitTimeoutMs);
+ } catch (Exception e) {
+ throw new Exception("Failed to get master client.", e);
+ }
+ TInitExternalCtlMetaRequest request = new
TInitExternalCtlMetaRequest();
+ request.setCatalogId(catalogId);
+ if (dbId != -1) {
+ request.setDbId(dbId);
+ }
+ if (tableId != -1) {
+ request.setTableId(tableId);
+ }
+ boolean isReturnToPool = false;
+ try {
+ TInitExternalCtlMetaResult result =
client.initExternalCtlMeta(request);
+
ConnectContext.get().getEnv().getJournalObservable().waitOn(result.maxJournalId,
waitTimeoutMs);
+ isReturnToPool = true;
+ } catch (Exception e) {
+ LOG.warn("Failed to finish forward init operation, please try
again. ", e);
+ throw e;
+ } finally {
+ if (isReturnToPool) {
+ ClientPool.frontendPool.returnObject(thriftAddress, client);
+ } else {
+ ClientPool.frontendPool.invalidateObject(thriftAddress,
client);
+ }
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 8f6641ccb2..53ee7dd637 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -28,6 +28,7 @@ import org.apache.doris.catalog.S3Resource;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
@@ -43,6 +44,7 @@ import org.apache.doris.common.ThriftServerEventProcessor;
import org.apache.doris.common.UserException;
import org.apache.doris.common.Version;
import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.master.MasterImpl;
import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -77,6 +79,8 @@ import org.apache.doris.thrift.TGetStoragePolicy;
import org.apache.doris.thrift.TGetStoragePolicyResult;
import org.apache.doris.thrift.TGetTablesParams;
import org.apache.doris.thrift.TGetTablesResult;
+import org.apache.doris.thrift.TInitExternalCtlMetaRequest;
+import org.apache.doris.thrift.TInitExternalCtlMetaResult;
import org.apache.doris.thrift.TListPrivilegesResult;
import org.apache.doris.thrift.TListTableStatusResult;
import org.apache.doris.thrift.TLoadTxn2PCRequest;
@@ -1061,4 +1065,76 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
LOG.debug("refresh storage policy request: {}", result);
return result;
}
+
+ @Override
+ public TInitExternalCtlMetaResult
initExternalCtlMeta(TInitExternalCtlMetaRequest request) throws TException {
+ if (request.isSetCatalogId() && request.isSetDbId() &&
request.isSetTableId()) {
+ return initTable(request.catalogId, request.dbId, request.tableId);
+ } else if (request.isSetCatalogId() && request.isSetDbId()) {
+ return initDb(request.catalogId, request.dbId);
+ } else if (request.isSetCatalogId()) {
+ return initCatalog(request.catalogId);
+ } else {
+ throw new TException("Catalog name is not set. Init failed.");
+ }
+ }
+
+ private TInitExternalCtlMetaResult initCatalog(long catalogId) throws
TException {
+ CatalogIf catalog =
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
+ if (!(catalog instanceof ExternalCatalog)) {
+ throw new TException("Only support forward ExternalCatalog init
operation.");
+ }
+ ((ExternalCatalog) catalog).makeSureInitialized();
+ TInitExternalCtlMetaResult result = new TInitExternalCtlMetaResult();
+ result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
+ result.setStatus("OK");
+ return result;
+ }
+
+ private TInitExternalCtlMetaResult initDb(long catalogId, long dbId)
throws TException {
+ CatalogIf catalog =
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
+ if (!(catalog instanceof ExternalCatalog)) {
+ throw new TException("Only support forward ExternalCatalog init
operation.");
+ }
+ DatabaseIf db = catalog.getDbNullable(dbId);
+ if (db == null) {
+ throw new TException("database " + dbId + " is null");
+ }
+ if (!(db instanceof ExternalDatabase)) {
+ throw new TException("Only support forward ExternalDatabase init
operation.");
+ }
+ ((ExternalDatabase) db).makeSureInitialized();
+ TInitExternalCtlMetaResult result = new TInitExternalCtlMetaResult();
+ result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
+ result.setStatus("OK");
+ return result;
+ }
+
+ private TInitExternalCtlMetaResult initTable(long catalogId, long dbId,
long tableId)
+ throws TException {
+ CatalogIf catalog =
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
+ if (!(catalog instanceof ExternalCatalog)) {
+ throw new TException("Only support forward ExternalCatalog init
operation.");
+ }
+ DatabaseIf db = catalog.getDbNullable(dbId);
+ if (db == null) {
+ throw new TException("database " + dbId + " is null");
+ }
+ if (!(db instanceof ExternalDatabase)) {
+ throw new TException("Only support forward ExternalDatabase init
operation.");
+ }
+ TableIf table = db.getTableNullable(tableId);
+ if (table == null) {
+ throw new TException("table " + tableId + " is null");
+ }
+ if (!(table instanceof ExternalTable)) {
+ throw new TException("Only support forward ExternalTable init
operation.");
+ }
+
+ ((ExternalTable) table).makeSureInitialized();
+ TInitExternalCtlMetaResult result = new TInitExternalCtlMetaResult();
+ result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
+ result.setStatus("OK");
+ return result;
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
index 871870178a..d1564261e6 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
@@ -27,7 +27,13 @@ import org.apache.doris.analysis.GrantStmt;
import org.apache.doris.analysis.ShowCatalogStmt;
import org.apache.doris.analysis.SwitchStmt;
import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.external.EsExternalDatabase;
+import org.apache.doris.catalog.external.EsExternalTable;
+import org.apache.doris.catalog.external.HMSExternalDatabase;
+import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
@@ -37,6 +43,7 @@ import org.apache.doris.qe.ShowResultSet;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.utframe.TestWithFeService;
+import com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -95,6 +102,15 @@ public class CatalogMgrTest extends TestWithFeService {
rootCtx);
env.getCatalogMgr().createCatalog(iceBergCatalog);
+ // create es catalog
+ CreateCatalogStmt esCatalog = (CreateCatalogStmt) parseAndAnalyzeStmt(
+ "create catalog es properties('type' = 'es',
'elasticsearch.hosts' = 'http://192.168.0.1');",
+ rootCtx);
+ env.getCatalogMgr().createCatalog(esCatalog);
+
+ createDbAndTableForCatalog(env.getCatalogMgr().getCatalog("hive"));
+ createDbAndTableForCatalog(env.getCatalogMgr().getCatalog("es"));
+
// switch to hive.
SwitchStmt switchHive = (SwitchStmt) parseAndAnalyzeStmt("switch
hive;", rootCtx);
env.changeCatalog(rootCtx, switchHive.getCatalogName());
@@ -109,6 +125,26 @@ public class CatalogMgrTest extends TestWithFeService {
user2.analyze(SystemInfoService.DEFAULT_CLUSTER);
}
+ private void createDbAndTableForCatalog(CatalogIf catalog) {
+ List<Column> schema = Lists.newArrayList();
+ schema.add(new Column("k1", PrimitiveType.INT));
+ if (catalog instanceof HMSExternalCatalog) {
+ HMSExternalCatalog hmsCatalog = (HMSExternalCatalog) catalog;
+ HMSExternalDatabase db = new HMSExternalDatabase(hmsCatalog,
10000, "hive_db1");
+ HMSExternalTable tbl = new HMSExternalTable(10001, "hive_tbl1",
"hive_db1", hmsCatalog);
+ tbl.setNewFullSchema(schema);
+ db.addTableForTest(tbl);
+ hmsCatalog.addDatabaseForTest(db);
+ } else if (catalog instanceof ExternalCatalog) {
+ EsExternalCatalog esCatalog = (EsExternalCatalog) catalog;
+ EsExternalDatabase db = new EsExternalDatabase(esCatalog, 10002,
"es_db1");
+ EsExternalTable tbl = new EsExternalTable(10003, "es_tbl1",
"es_tbl1", esCatalog);
+ tbl.setNewFullSchema(schema);
+ db.addTableForTest(tbl);
+ esCatalog.addDatabaseForTest(db);
+ }
+ }
+
@Test
public void testNormalCase() throws Exception {
String createCatalogSql = "CREATE CATALOG hms_catalog "
@@ -119,7 +155,7 @@ public class CatalogMgrTest extends TestWithFeService {
String showCatalogSql = "SHOW CATALOGS";
ShowCatalogStmt showStmt = (ShowCatalogStmt)
parseAndAnalyzeStmt(showCatalogSql);
ShowResultSet showResultSet = mgr.showCatalogs(showStmt);
- Assertions.assertEquals(4, showResultSet.getResultRows().size());
+ Assertions.assertEquals(5, showResultSet.getResultRows().size());
String alterCatalogNameSql = "ALTER CATALOG hms_catalog RENAME " +
MY_CATALOG + ";";
AlterCatalogNameStmt alterNameStmt = (AlterCatalogNameStmt)
parseAndAnalyzeStmt(alterCatalogNameSql);
@@ -151,7 +187,7 @@ public class CatalogMgrTest extends TestWithFeService {
DropCatalogStmt dropCatalogStmt = (DropCatalogStmt)
parseAndAnalyzeStmt(dropCatalogSql);
mgr.dropCatalog(dropCatalogStmt);
showResultSet = mgr.showCatalogs(showStmt);
- Assertions.assertEquals(3, showResultSet.getResultRows().size());
+ Assertions.assertEquals(4, showResultSet.getResultRows().size());
}
private void testCatalogMgrPersist() throws Exception {
@@ -173,7 +209,7 @@ public class CatalogMgrTest extends TestWithFeService {
DataInputStream dis = new DataInputStream(new FileInputStream(file));
CatalogMgr mgr2 = CatalogMgr.read(dis);
- Assert.assertEquals(4, mgr2.listCatalogs().size());
+ Assert.assertEquals(5, mgr2.listCatalogs().size());
Assert.assertEquals(myCatalog.getId(),
mgr2.getCatalog(MY_CATALOG).getId());
Assert.assertEquals(0, mgr2.getInternalCatalog().getId());
Assert.assertEquals(0,
mgr2.getCatalog(InternalCatalog.INTERNAL_DS_ID).getId());
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index b3c5fe7e29..5d48c1ae7e 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -678,6 +678,17 @@ struct TWaitingTxnStatusResult {
2: optional i32 txn_status_id
}
+struct TInitExternalCtlMetaRequest {
+ 1: optional i64 catalogId
+ 2: optional i64 dbId
+ 3: optional i64 tableId
+}
+
+struct TInitExternalCtlMetaResult {
+ 1: optional i64 maxJournalId;
+ 2: optional string status;
+}
+
service FrontendService {
TGetDbsResult getDbNames(1: TGetDbsParams params)
TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -713,4 +724,5 @@ service FrontendService {
TFrontendPingFrontendResult ping(1: TFrontendPingFrontendRequest request)
AgentService.TGetStoragePolicyResult refreshStoragePolicy()
+ TInitExternalCtlMetaResult initExternalCtlMeta(1:
TInitExternalCtlMetaRequest request)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]