This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new d250200aba [enhancement](table-meta) flush column unique ids for
tables before 1.2 automatically (#23616) (#23991)
d250200aba is described below
commit d250200abaecac5715d7484b041aca1c8767528d
Author: Siyang Tang <[email protected]>
AuthorDate: Wed Sep 6 19:23:31 2023 +0800
[enhancement](table-meta) flush column unique ids for tables before 1.2
automatically (#23616) (#23991)
---
docs/en/docs/admin-manual/config/fe-config.md | 7 +
.../SHOW-CONVERT-LIGHR-SCHEMA-CHANGE-PROCESS.md | 62 +++++++
docs/zh-CN/docs/admin-manual/config/fe-config.md | 6 +
.../SHOW-CONVERT-LIGHT-SCHEMA-CHANGE-PROCESS.md | 62 +++++++
.../main/java/org/apache/doris/common/Config.java | 10 ++
fe/fe-core/src/main/cup/sql_parser.cup | 7 +-
.../doris/alter/AlterLightSchChangeHelper.java | 47 +++---
.../apache/doris/alter/SchemaChangeHandler.java | 9 +-
.../apache/doris/analysis/ShowConvertLSCStmt.java | 69 ++++++++
.../apache/doris/catalog/ColumnIdFlushDaemon.java | 180 +++++++++++++++++++++
.../main/java/org/apache/doris/catalog/Env.java | 8 +
.../java/org/apache/doris/qe/ShowExecutor.java | 43 +++++
fe/fe-core/src/main/jflex/sql_scanner.flex | 1 +
13 files changed, 484 insertions(+), 27 deletions(-)
diff --git a/docs/en/docs/admin-manual/config/fe-config.md
b/docs/en/docs/admin-manual/config/fe-config.md
index 7f11a4a99c..eafe9938c0 100644
--- a/docs/en/docs/admin-manual/config/fe-config.md
+++ b/docs/en/docs/admin-manual/config/fe-config.md
@@ -2752,3 +2752,10 @@ Default: 4
This variable indicates the number of digits by which to increase the scale of
the result of
division operations performed with the `/` operator.
+
+#### `enable_convert_light_weight_schema_change`
+
+Default:true
+
+Temporary configuration option. After it is enabled, a background thread will
be started to automatically modify all olap tables to light schema change. The
modification results can be viewed through the command `show
convert_light_schema_change [from db]`, and the conversion results of all
non-light schema change tables will be displayed.
+
diff --git
a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-CONVERT-LIGHR-SCHEMA-CHANGE-PROCESS.md
b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-CONVERT-LIGHR-SCHEMA-CHANGE-PROCESS.md
new file mode 100644
index 0000000000..91918c4452
--- /dev/null
+++
b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-CONVERT-LIGHR-SCHEMA-CHANGE-PROCESS.md
@@ -0,0 +1,62 @@
+---
+{
+ "title": "SHOW-CONVERT-LIGHT-SCHEMA-CHANGE-PROCESS",
+ "language": "en"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## SHOW-CONVERT-LIGHT-SCHEMA-CHANGE-PROCESS
+
+### Name
+
+SHOW CONVERT LIGHT SCHEMA CHANGE PROCESS
+
+### Description
+
+This statement is used to show the process of converting light schema change
process. should enable config `enable_convert_light_weight_schema_change`.
+
+grammar:
+
+```sql
+SHOW CONVERT_LIGHT_SCHEMA_CHANGE_PROCESS [FROM DATABASE db]
+```
+
+### Example
+
+1. View the converting process in db named test
+
+ ```sql
+ SHOW CONVERT_LIGHT_SCHEMA_CHANGE_PROCESS FROM DATABASE test;
+ ````
+
+2. View the converting process globally
+
+ ```sql
+ SHOW CONVERT_LIGHT_SCHEMA_CHANGE_PROCESS;
+ ```
+
+
+### Keywords
+
+ SHOW, CONVERT_LIGHT_SCHEMA_CHANGE_PROCESS
+
+### Best Practice
\ No newline at end of file
diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md
b/docs/zh-CN/docs/admin-manual/config/fe-config.md
index fc71a939a4..e8be232bfc 100644
--- a/docs/zh-CN/docs/admin-manual/config/fe-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md
@@ -2754,3 +2754,9 @@ show data (其他用法:HELP SHOW DATA)
默认值:4
此变量表示增加与/运算符执行的除法操作结果规模的位数。默认为4。
+
+#### `enable_convert_light_weight_schema_change`
+
+默认值:true
+
+暂时性配置项,开启后会启动后台线程自动将所有的olap表修改为可light schema change,修改结果可通过命令`show
convert_light_schema_change [from db]` 来查看,将会展示所有非light schema change表的转换结果
\ No newline at end of file
diff --git
a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-CONVERT-LIGHT-SCHEMA-CHANGE-PROCESS.md
b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-CONVERT-LIGHT-SCHEMA-CHANGE-PROCESS.md
new file mode 100644
index 0000000000..7d084a5515
--- /dev/null
+++
b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-CONVERT-LIGHT-SCHEMA-CHANGE-PROCESS.md
@@ -0,0 +1,62 @@
+---
+{
+ "title": "SHOW-CONVERT-LIGHT-SCHEMA-CHANGE-PROCESS",
+ "language": "zh-CN"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## SHOW-CONVERT-LIGHT-SCHEMA-CHANGE-PROCESS
+
+### Name
+
+SHOW CONVERT LIGHT SCHEMA CHANGE PROCESS
+
+### Description
+
+用来查看将非light schema change的olpa表转换为light schema change表的情况,
需要开启配置`enable_convert_light_weight_schema_change`
+
+语法:
+
+```sql
+SHOW CONVERT_LIGHT_SCHEMA_CHANGE_PROCESS [FROM DATABASE db]
+```
+
+### Example
+
+1. 查看在database test上的转换情况
+
+ ```sql
+ SHOW CONVERT_LIGHT_SCHEMA_CHANGE_PROCESS FROM DATABASE test;
+ ````
+
+2. 查看全局的转换情况
+
+ ```sql
+ SHOW CONVERT_LIGHT_SCHEMA_CHANGE_PROCESS;
+ ```
+
+
+### Keywords
+
+ SHOW, CONVERT_LIGHT_SCHEMA_CHANGE_PROCESS
+
+### Best Practice
\ No newline at end of file
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 705c93b8c3..b95139d3b1 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2100,4 +2100,14 @@ public class Config extends ConfigBase {
@ConfField
public static boolean forbid_running_alter_job = false;
+
+ @ConfField
+ public static int table_stats_health_threshold = 80;
+
+ @ConfField(description = {
+ "暂时性配置项,开启后会自动将所有的olap表修改为可light schema change",
+ "temporary config filed, will make all olap tables enable light
schema change"
+ })
+ public static boolean enable_convert_light_weight_schema_change = true;
+
}
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup
b/fe/fe-core/src/main/cup/sql_parser.cup
index 39fa006140..dd8dceee64 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -640,7 +640,8 @@ terminal String
KW_PREPARE,
KW_EXECUTE,
KW_LINES,
- KW_IGNORE;
+ KW_IGNORE,
+ KW_CONVERT_LSC;
terminal COMMA, COLON, DOT, DOTDOTDOT, AT, STAR, LPAREN, RPAREN, SEMICOLON,
LBRACKET, RBRACKET, LBRACE, RBRACE, DIVIDE, MOD, ADD, SUBTRACT, PLACEHOLDER,
ARROW;
terminal BITAND, BITOR, BITXOR, BITNOT;
@@ -4158,6 +4159,10 @@ show_param ::=
{:
RESULT = new ShowBuildIndexStmt(db, parser.where, orderByClause,
limitClause);
:}
+ | KW_CONVERT_LSC KW_FROM opt_db:db
+ {:
+ RESULT = new ShowConvertLSCStmt(db);
+ :}
;
opt_tmp ::=
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java
index 6dab84bf80..1e77b0a5ae 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java
@@ -26,7 +26,6 @@ import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Tablet;
-import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.persist.AlterLightSchemaChangeInfo;
import org.apache.doris.proto.InternalService.PFetchColIdsRequest;
@@ -42,8 +41,8 @@ import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import com.google.common.base.Preconditions;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
@@ -52,6 +51,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -79,12 +79,11 @@ public class AlterLightSchChangeHelper {
* 2. refresh table metadata
* 3. write edit log
*/
- public void enableLightSchemaChange() throws DdlException {
- final Map<Long, PFetchColIdsRequest> params = initParams();
- final AlterLightSchemaChangeInfo info = callForColumnsInfo(params);
+ public void enableLightSchemaChange() throws IllegalStateException {
+ final AlterLightSchemaChangeInfo info = callForColumnsInfo();
updateTableMeta(info);
Env.getCurrentEnv().getEditLog().logAlterLightSchemaChange(info);
- LOG.info("successfully enable `light_schema_change`");
+ LOG.info("successfully enable `light_schema_change`, db={}, tbl={}",
db.getFullName(), olapTable.getName());
}
/**
@@ -137,35 +136,36 @@ public class AlterLightSchChangeHelper {
}
/**
- * @param beIdToRequest rpc param for corresponding BEs
* @return contains indexIds to each tablet schema info which consists of
columnName to corresponding
* column unique id pairs
- * @throws DdlException as a wrapper for rpc failures
+ * @throws IllegalStateException as a wrapper for rpc failures
*/
- private AlterLightSchemaChangeInfo callForColumnsInfo(Map<Long,
PFetchColIdsRequest> beIdToRequest)
- throws DdlException {
- final List<Future<PFetchColIdsResponse>> futureList = new
ArrayList<>();
- // start a rpc in a pipeline way
+ public AlterLightSchemaChangeInfo callForColumnsInfo()
+ throws IllegalStateException {
+ Map<Long, PFetchColIdsRequest> beIdToRequest = initParams();
+ Map<Long, Future<PFetchColIdsResponse>> beIdToRespFuture = new
HashMap<>();
try {
for (Long beId : beIdToRequest.keySet()) {
final Backend backend =
Env.getCurrentSystemInfo().getIdToBackend().get(beId);
- final TNetworkAddress address = new
TNetworkAddress(backend.getHost(), backend.getBrpcPort());
+ final TNetworkAddress address =
+ new
TNetworkAddress(Objects.requireNonNull(backend).getHost(),
backend.getBrpcPort());
final Future<PFetchColIdsResponse> responseFuture =
BackendServiceProxy.getInstance()
.getColumnIdsByTabletIds(address,
beIdToRequest.get(beId));
- futureList.add(responseFuture);
+ beIdToRespFuture.put(beId, responseFuture);
}
} catch (RpcException e) {
- throw new DdlException("fetch columnIds RPC failed", e);
+ throw new IllegalStateException("fetch columnIds RPC failed", e);
}
// wait for and get results
final long start = System.currentTimeMillis();
long timeoutMs = ConnectContext.get().getExecTimeout() * 1000L;
final List<PFetchColIdsResponse> resultList = new ArrayList<>();
try {
- for (Future<PFetchColIdsResponse> future : futureList) {
- final PFetchColIdsResponse response = future.get(timeoutMs,
TimeUnit.MILLISECONDS);
+ for (Map.Entry<Long, Future<PFetchColIdsResponse>> entry :
beIdToRespFuture.entrySet()) {
+ final PFetchColIdsResponse response =
entry.getValue().get(timeoutMs, TimeUnit.MILLISECONDS);
if (response.getStatus().getStatusCode() !=
TStatusCode.OK.getValue()) {
- throw new
DdlException(response.getStatus().getErrorMsgs(0));
+ throw new IllegalStateException(String.format("fail to get
column info from be: %s, msg:%s",
+ entry.getKey(),
response.getStatus().getErrorMsgs(0)));
}
resultList.add(response);
// refresh the timeout
@@ -176,9 +176,9 @@ public class AlterLightSchChangeHelper {
"impossible state, timeout should happened");
}
} catch (InterruptedException | ExecutionException e) {
- throw new DdlException("fetch columnIds RPC result failed: ", e);
+ throw new IllegalStateException("fetch columnIds RPC result
failed: ", e);
} catch (TimeoutException e) {
- throw new DdlException("fetch columnIds RPC result timeout", e);
+ throw new IllegalStateException("fetch columnIds RPC result
timeout", e);
}
return compactToAlterLscInfo(resultList);
}
@@ -207,7 +207,7 @@ public class AlterLightSchChangeHelper {
return new AlterLightSchemaChangeInfo(db.getId(), olapTable.getId(),
indexIdToTabletInfo);
}
- public void updateTableMeta(AlterLightSchemaChangeInfo info) throws
DdlException {
+ public void updateTableMeta(AlterLightSchemaChangeInfo info) throws
IllegalStateException {
Preconditions.checkNotNull(info, "passed in info should be not null");
// update index-meta once and for all
// schema pair: <maxColId, columns>
@@ -242,10 +242,9 @@ public class AlterLightSchChangeHelper {
indexMeta.setSchema(schemaPair.second);
}
} catch (IOException e) {
- throw new DdlException("fail to reset index schema", e);
+ throw new IllegalStateException("fail to reset index schema", e);
}
// write table property
olapTable.setEnableLightSchemaChange(true);
- LOG.info("successfully update table meta for `light_schema_change`");
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 16c3da53d4..3de1d43163 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -2014,7 +2014,12 @@ public class SchemaChangeHandler extends AlterHandler {
private void enableLightSchemaChange(Database db, OlapTable olapTable)
throws DdlException {
final AlterLightSchChangeHelper alterLightSchChangeHelper = new
AlterLightSchChangeHelper(db, olapTable);
- alterLightSchChangeHelper.enableLightSchemaChange();
+ try {
+ alterLightSchChangeHelper.enableLightSchemaChange();
+ } catch (IllegalStateException e) {
+ throw new DdlException(String.format("failed to enable light
schema change for table %s.%s",
+ db.getFullName(), olapTable.getName()), e);
+ }
}
public void replayAlterLightSchChange(AlterLightSchemaChangeInfo info)
throws MetaNotFoundException {
@@ -2024,7 +2029,7 @@ public class SchemaChangeHandler extends AlterHandler {
final AlterLightSchChangeHelper alterLightSchChangeHelper = new
AlterLightSchChangeHelper(db, olapTable);
try {
alterLightSchChangeHelper.updateTableMeta(info);
- } catch (DdlException e) {
+ } catch (IllegalStateException e) {
LOG.warn("failed to replay alter light schema change", e);
} finally {
olapTable.writeUnlock();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowConvertLSCStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowConvertLSCStmt.java
new file mode 100644
index 0000000000..bab2e3b34b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowConvertLSCStmt.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.ShowResultSetMetaData;
+
+public class ShowConvertLSCStmt extends ShowStmt {
+
+ private final String dbName;
+
+ public ShowConvertLSCStmt(String dbName) {
+ this.dbName = dbName;
+ }
+
+ @Override
+ public void analyze(Analyzer analyzer) throws UserException {
+ super.analyze(analyzer);
+
+ if
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(),
PrivPredicate.ADMIN)
+ &&
!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(),
+ PrivPredicate.OPERATOR)) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"ADMIN/OPERATOR");
+ }
+ }
+
+ @Override
+ public ShowResultSetMetaData getMetaData() {
+ ShowResultSetMetaData.Builder builder =
ShowResultSetMetaData.builder();
+ Column databaseColumn = new Column("database",
ScalarType.createVarcharType(30));
+ Column tableNameColumn = new Column("table",
ScalarType.createVarcharType(30));
+ Column statusColum = new Column("status",
ScalarType.createVarcharType(30));
+ builder.addColumn(databaseColumn);
+ builder.addColumn(tableNameColumn);
+ builder.addColumn(statusColum);
+ return builder.build();
+ }
+
+ @Override
+ public RedirectStatus getRedirectStatus() {
+ return RedirectStatus.FORWARD_NO_SYNC;
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColumnIdFlushDaemon.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColumnIdFlushDaemon.java
new file mode 100644
index 0000000000..28007a8e66
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColumnIdFlushDaemon.java
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import org.apache.doris.alter.AlterLightSchChangeHelper;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.persist.AlterLightSchemaChangeInfo;
+
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
+
+/**
+ * note(tsy): this class is temporary, make table before 1.2 to enable light
schema change
+ */
+public class ColumnIdFlushDaemon extends MasterDaemon {
+
+ private static final Logger LOG =
LogManager.getLogger(ColumnIdFlushDaemon.class);
+
+ /**
+ * db name -> (tbl name -> status)
+ */
+ private final Map<String, Map<String, FlushStatus>> resultCollector;
+
+ private final ReadWriteLock rwLock;
+
+ private final BiConsumer<Database, OlapTable> flushFunc;
+
+ public ColumnIdFlushDaemon() {
+ super("colum-id-flusher", TimeUnit.HOURS.toMillis(1));
+ resultCollector = Maps.newHashMap();
+ rwLock = new ReentrantReadWriteLock();
+ if (Config.enable_convert_light_weight_schema_change) {
+ flushFunc = this::doFlush;
+ } else {
+ flushFunc = (db, table) -> record(db.getFullName(),
table.getName(), FlushStatus.init());
+ }
+ }
+
+ @Override
+ protected void runAfterCatalogReady() {
+ flush();
+ }
+
+ private void flush() {
+ List<Database> dbs = Env.getCurrentEnv().getInternalCatalog().getDbs();
+ for (Database db : dbs) {
+ rwLock.writeLock().lock();
+ try {
+ db.getTables()
+ .stream()
+ .filter(table -> table instanceof OlapTable)
+ .map(table -> (OlapTable) table)
+ .filter(olapTable ->
!olapTable.getTableProperty().getUseSchemaLightChange())
+ .forEach(table -> flushFunc.accept(db, table));
+ } finally {
+ rwLock.writeLock().unlock();
+ }
+ try {
+ // avoid too often to call be
+ sleep(3000);
+ } catch (InterruptedException ignore) {
+ // do nothing
+ }
+ }
+ }
+
+ private void doFlush(Database db, OlapTable table) {
+ record(db.getFullName(), table.getName(), FlushStatus.init());
+ AlterLightSchChangeHelper schChangeHelper = new
AlterLightSchChangeHelper(db, table);
+ AlterLightSchemaChangeInfo changeInfo;
+ try {
+ changeInfo = schChangeHelper.callForColumnsInfo();
+ } catch (IllegalStateException e) {
+ record(db.getFullName(), table.getName(),
FlushStatus.failed(e.getMessage()));
+ return;
+ }
+ table.writeLock();
+ try {
+ if (table.getTableProperty().getUseSchemaLightChange()) {
+ removeRecord(db.getFullName(), table.getName());
+ return;
+ }
+ schChangeHelper.updateTableMeta(changeInfo);
+
Env.getCurrentEnv().getEditLog().logAlterLightSchemaChange(changeInfo);
+ LOG.info("successfully enable `light_schema_change`, db={},
tbl={}",
+ db.getFullName(), table.getName());
+ removeRecord(db.getFullName(), table.getName());
+ } catch (IllegalStateException e) {
+ record(db.getFullName(), table.getName(),
FlushStatus.failed(e.getMessage()));
+ } finally {
+ table.writeUnlock();
+ }
+ }
+
+ private void record(String dbName, String tableName, FlushStatus status) {
+ resultCollector.putIfAbsent(dbName, Maps.newHashMap());
+ Map<String, FlushStatus> tableToStatus = resultCollector.get(dbName);
+ tableToStatus.put(tableName, status);
+ }
+
+ private void removeRecord(String dbName, String tableName) {
+ Map<String, FlushStatus> tableToStatus;
+ if (resultCollector.containsKey(dbName)
+ && (tableToStatus =
resultCollector.get(dbName)).containsKey(tableName)) {
+ tableToStatus.remove(tableName);
+ if (tableToStatus.isEmpty()) {
+ resultCollector.remove(dbName);
+ }
+ }
+ }
+
+ public Map<String, Map<String, FlushStatus>> getResultCollector() {
+ return resultCollector;
+ }
+
+ public void readLock() {
+ rwLock.readLock().lock();
+ }
+
+ public void readUnlock() {
+ rwLock.readLock().unlock();
+ }
+
+ public static class FlushStatus {
+
+ private FlushStatus() {
+ this.success = true;
+ this.msg = "Waiting to be converted";
+ }
+
+ private FlushStatus(String msg) {
+ this.success = false;
+ this.msg = msg;
+ }
+
+ public static FlushStatus init() {
+ return new FlushStatus();
+ }
+
+ public static FlushStatus failed(String reason) {
+ return new FlushStatus(reason);
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public String getMsg() {
+ return msg;
+ }
+
+ private final boolean success;
+
+ private final String msg;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 554712db8d..9fbed8b63d 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -333,6 +333,8 @@ public class Env {
private Daemon timePrinter;
private Daemon listener;
+ private ColumnIdFlushDaemon columnIdFlusher;
+
private boolean isFirstTimeStartUp = false;
private boolean isElectable;
// set to true after finished replay all meta and ready to serve
@@ -674,6 +676,7 @@ public class Env {
this.hiveTransactionMgr = new HiveTransactionMgr();
this.binlogManager = new BinlogManager();
this.binlogGcer = new BinlogGcer();
+ this.columnIdFlusher = new ColumnIdFlushDaemon();
}
public static void destroyCheckpoint() {
@@ -1503,6 +1506,7 @@ public class Env {
// binlog gcer
binlogGcer.start();
+ columnIdFlusher.start();
}
// start threads that should running on all FE
@@ -5505,4 +5509,8 @@ public class Env {
queryStats.clear(info);
editLog.logCleanQueryStats(info);
}
+
+ public ColumnIdFlushDaemon getColumnIdFlusher() {
+ return columnIdFlusher;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 9c3e1b1b31..0ebcb1c7dc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -40,6 +40,7 @@ import org.apache.doris.analysis.ShowCollationStmt;
import org.apache.doris.analysis.ShowColumnHistStmt;
import org.apache.doris.analysis.ShowColumnStatsStmt;
import org.apache.doris.analysis.ShowColumnStmt;
+import org.apache.doris.analysis.ShowConvertLSCStmt;
import org.apache.doris.analysis.ShowCreateCatalogStmt;
import org.apache.doris.analysis.ShowCreateDbStmt;
import org.apache.doris.analysis.ShowCreateFunctionStmt;
@@ -110,6 +111,7 @@ import org.apache.doris.backup.Repository;
import org.apache.doris.backup.RestoreJob;
import org.apache.doris.blockrule.SqlBlockRule;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.ColumnIdFlushDaemon;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.DynamicPartitionProperty;
@@ -236,6 +238,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@@ -427,6 +430,8 @@ public class ShowExecutor {
handleShowBuildIndexStmt();
} else if (stmt instanceof ShowAnalyzeTaskStatus) {
handleShowAnalyzeTaskStatus();
+ } else if (stmt instanceof ShowConvertLSCStmt) {
+ handleShowConvertLSC();
} else {
handleEmtpy();
}
@@ -2828,5 +2833,43 @@ public class ShowExecutor {
resultSet = new ShowResultSet(showStmt.getMetaData(), rows);
}
+
+ private void handleShowConvertLSC() {
+ ShowConvertLSCStmt showStmt = (ShowConvertLSCStmt) stmt;
+ ColumnIdFlushDaemon columnIdFlusher =
Env.getCurrentEnv().getColumnIdFlusher();
+ columnIdFlusher.readLock();
+ List<List<String>> rows;
+ try {
+ Map<String, Map<String, ColumnIdFlushDaemon.FlushStatus>>
resultCollector =
+ columnIdFlusher.getResultCollector();
+ rows = new ArrayList<>();
+ String db = ((ShowConvertLSCStmt) stmt).getDbName();
+ if (db != null) {
+ Map<String, ColumnIdFlushDaemon.FlushStatus> tblNameToStatus =
resultCollector.get(db);
+ if (tblNameToStatus != null) {
+ tblNameToStatus.forEach((tblName, status) -> {
+ List<String> row = new ArrayList<>();
+ row.add(db);
+ row.add(tblName);
+ row.add(status.getMsg());
+ rows.add(row);
+ });
+ }
+ } else {
+ resultCollector.forEach((dbName, tblNameToStatus) ->
+ tblNameToStatus.forEach((tblName, status) -> {
+ List<String> row = new ArrayList<>();
+ row.add(dbName);
+ row.add(tblName);
+ row.add(status.getMsg());
+ rows.add(row);
+ }));
+ }
+ } finally {
+ columnIdFlusher.readUnlock();
+ }
+ resultSet = new ShowResultSet(showStmt.getMetaData(), rows);
+ }
+
}
diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex
b/fe/fe-core/src/main/jflex/sql_scanner.flex
index cc4c11607b..56f8faae18 100644
--- a/fe/fe-core/src/main/jflex/sql_scanner.flex
+++ b/fe/fe-core/src/main/jflex/sql_scanner.flex
@@ -493,6 +493,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("lines", new Integer(SqlParserSymbols.KW_LINES));
keywordMap.put("ignore", new Integer(SqlParserSymbols.KW_IGNORE));
keywordMap.put("expired", new Integer(SqlParserSymbols.KW_EXPIRED));
+ keywordMap.put("convert_light_schema_change_process", new
Integer(SqlParserSymbols.KW_CONVERT_LSC));
}
// map from token id to token description
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]