This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 70521fd7937d752085af24e55d39509b8bbd40c2 Author: Zhong, Yanghong <nju_y...@apache.org> AuthorDate: Mon Apr 13 15:15:46 2020 +0800 KYLIN-4421 Add a rest API for updating table & database name --- .../kylin/metadata/TableMetadataManager.java | 27 ++++- .../kylin/metadata/project/ProjectManager.java | 16 +++ .../kylin/rest/controller/TableController.java | 17 +++ .../kylin/rest/request/TableUpdateRequest.java | 44 ++++++++ .../apache/kylin/rest/service/TableService.java | 115 +++++++++++++++++++++ .../service/update/TableSchemaUpdateMapping.java | 3 +- 6 files changed, 217 insertions(+), 5 deletions(-) diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java index 932f631..ec903d4 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java @@ -185,7 +185,7 @@ public class TableMetadataManager { Map<String, TableDesc> ret = new LinkedHashMap<>(); for (String tableName : prjTableNames) { String tableIdentity = getTableIdentity(tableName); - ret.put(tableIdentity, getProjectSpecificTableDesc(tableIdentity, prj)); + ret.put(tableIdentity, getProjectSpecificTableDesc(tableIdentity, prj, true)); } return ret; } @@ -195,8 +195,12 @@ public class TableMetadataManager { * Get TableDesc by name */ public TableDesc getTableDesc(String tableName, String prj) { + return getTableDesc(tableName, prj, true); + } + + public TableDesc getTableDesc(String tableName, String prj, boolean ifUseGlobal) { try (AutoLock lock = srcTableMapLock.lockForWrite()) { - return getProjectSpecificTableDesc(getTableIdentity(tableName), prj); + return getProjectSpecificTableDesc(getTableIdentity(tableName), prj, ifUseGlobal); } } @@ -205,11 +209,11 @@ public class TableMetadataManager { * * All locks on srcTableMapLock are WRITE LOCKS because of this method!! */ - private TableDesc getProjectSpecificTableDesc(String fullTableName, String prj) { + private TableDesc getProjectSpecificTableDesc(String fullTableName, String prj, boolean ifUseGlobal) { String key = mapKey(fullTableName, prj); TableDesc result = srcTableMap.get(key); - if (result == null) { + if (result == null && ifUseGlobal) { try (AutoLock lock = srcTableMapLock.lockForWrite()) { result = srcTableMap.get(mapKey(fullTableName, null)); if (result != null) { @@ -402,6 +406,21 @@ public class TableMetadataManager { } } + public void saveNewTableExtFromOld(String oldTableId, String prj, String newTableId) throws IOException { + try (AutoLock lock = srcExtMapLock.lockForWrite()) { + String path = TableExtDesc.concatResourcePath(oldTableId, prj); + ResourceStore store = getStore(); + TableExtDesc newTableExt = store.getResource(path, TABLE_EXT_SERIALIZER); + if (newTableExt != null) { + newTableExt.setIdentity(newTableId); + newTableExt.setLastModified(0L); + + newTableExt.init(prj); + srcExtCrud.save(newTableExt); + } + } + } + private TableExtDesc convertOldTableExtToNewer(String resourceName) { ResourceStore store = getStore(); Map<String, String> attrs = Maps.newHashMap(); diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java index c4ff1e0..15b6a2d 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java @@ -333,6 +333,22 @@ public class ProjectManager { } } + public void removeTableDescFromProject(String[] tableIdentities, String projectName) throws IOException { + try (AutoLock lock = prjMapLock.lockForWrite()) { + TableMetadataManager metaMgr = getTableManager(); + ProjectInstance projectInstance = getProject(projectName); + for (String tableId : tableIdentities) { + TableDesc table = metaMgr.getTableDesc(tableId, projectName); + if (table == null) { + throw new IllegalStateException("Cannot find table '" + tableId + "' in metadata manager"); + } + projectInstance.removeTable(table.getIdentity()); + } + + save(projectInstance); + } + } + public ProjectInstance addExtFilterToProject(String[] filters, String projectName) throws IOException { try (AutoLock lock = prjMapLock.lockForWrite()) { TableMetadataManager metaMgr = getTableManager(); diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java index 2f9c07c..6c529b9 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java @@ -29,10 +29,12 @@ import java.util.Set; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.rest.exception.BadRequestException; import org.apache.kylin.rest.exception.InternalErrorException; import org.apache.kylin.rest.exception.NotFoundException; import org.apache.kylin.rest.request.CardinalityRequest; import org.apache.kylin.rest.request.HiveTableRequest; +import org.apache.kylin.rest.request.TableUpdateRequest; import org.apache.kylin.rest.response.TableSnapshotResponse; import org.apache.kylin.rest.service.TableACLService; import org.apache.kylin.rest.service.TableService; @@ -133,6 +135,21 @@ public class TableController extends BasicController { return result; } + @RequestMapping(value = "/{project}/update", method = { RequestMethod.POST }, produces = { "application/json" }) + @ResponseBody + public void updateHiveTables(@PathVariable String project, @RequestBody TableUpdateRequest request) + throws IOException { + try { + tableService.updateHiveTable(project, request.getMapping(), request.isUseExisting()); + } catch (BadRequestException e) { + logger.error("Failed to update Hive Table", e); + throw e; + } catch (Throwable e) { + logger.error("Failed to update Hive Table", e); + throw new InternalErrorException(e.getLocalizedMessage(), e); + } + } + @RequestMapping(value = "/{tables}/{project}", method = { RequestMethod.DELETE }, produces = { "application/json" }) @ResponseBody public Map<String, String[]> unLoadHiveTables(@PathVariable String tables, @PathVariable String project) { diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/TableUpdateRequest.java b/server-base/src/main/java/org/apache/kylin/rest/request/TableUpdateRequest.java new file mode 100644 index 0000000..f645b52 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/request/TableUpdateRequest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.rest.request; + +import java.io.Serializable; +import java.util.Map; + +import org.apache.kylin.rest.service.update.TableSchemaUpdateMapping; + +public class TableUpdateRequest implements Serializable { + private Map<String, TableSchemaUpdateMapping> mapping; + private boolean isUseExisting; + + public Map<String, TableSchemaUpdateMapping> getMapping() { + return mapping; + } + + public void setMapping(Map<String, TableSchemaUpdateMapping> mapping) { + this.mapping = mapping; + } + + public boolean isUseExisting() { + return isUseExisting; + } + + public void setIsUseExisting(boolean useExisting) { + isUseExisting = useExisting; + } +} diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java index ea0b8b3..cd18d2b 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java @@ -27,7 +27,9 @@ import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -39,6 +41,8 @@ import org.apache.kylin.common.util.RandomUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo; import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager; import org.apache.kylin.dict.lookup.IExtLookupTableCache.CacheState; @@ -54,6 +58,7 @@ import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.metadata.TableMetadataManager; import org.apache.kylin.metadata.model.ColumnDesc; +import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.ISourceAware; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableExtDesc; @@ -63,6 +68,8 @@ import org.apache.kylin.rest.msg.Message; import org.apache.kylin.rest.msg.MsgPicker; import org.apache.kylin.rest.response.TableDescResponse; import org.apache.kylin.rest.response.TableSnapshotResponse; +import org.apache.kylin.rest.service.update.TableSchemaUpdateMapping; +import org.apache.kylin.rest.service.update.TableSchemaUpdater; import org.apache.kylin.rest.util.AclEvaluate; import org.apache.kylin.source.IReadableTable; import org.apache.kylin.source.IReadableTable.TableSignature; @@ -84,6 +91,7 @@ import org.apache.kylin.shaded.com.google.common.collect.LinkedHashMultimap; import org.apache.kylin.shaded.com.google.common.collect.Lists; import org.apache.kylin.shaded.com.google.common.collect.Maps; import org.apache.kylin.shaded.com.google.common.collect.SetMultimap; +import org.apache.kylin.shaded.com.google.common.collect.Sets; @Component("tableService") public class TableService extends BasicService { @@ -91,6 +99,10 @@ public class TableService extends BasicService { private static final Logger logger = LoggerFactory.getLogger(TableService.class); @Autowired + @Qualifier("cubeMgmtService") + private CubeService cubeService; + + @Autowired @Qualifier("modelMgmtService") private ModelService modelService; @@ -531,4 +543,107 @@ public class TableService extends BasicService { String[] dbTableName = HadoopUtil.parseHiveTableName(tableName); return (dbTableName[0] + "." + dbTableName[1]).toUpperCase(Locale.ROOT); } + + /** + * 1. Check whether it's able to do the change + * - related cube instance should be disabled + * 2. Get all influenced metadata + * - table + * - project + * - model + * - cube desc + * - cube instance + * 3. Update the metadata + * 4. Save the updated metadata + * - table + * - project + * - model + * - cube desc + * - cube instance + * 5. Delete dirty table metadata + */ + public void updateHiveTable(String projectName, Map<String, TableSchemaUpdateMapping> mapping, boolean isUseExisting) throws IOException { + final ProjectInstance prjInstance = getProjectManager().getProject(projectName); + if (prjInstance == null) { + throw new BadRequestException("Project " + projectName + " does not exist"); + } + // To deal with case sensitive issue for table resource path + final String project = prjInstance.getName(); + aclEvaluate.checkProjectWritePermission(project); + + // Check whether it's able to do the change + Set<CubeInstance> infCubes = cubeService.listAllCubes(project).stream() + .filter(cube -> isTablesUsed(cube.getModel(), mapping.keySet())).collect(Collectors.toSet()); + Set<CubeInstance> readyCubeSet = infCubes.stream().filter(cube -> cube.isReady()).collect(Collectors.toSet()); + if (!readyCubeSet.isEmpty()) { + throw new BadRequestException("Influenced cubes " + readyCubeSet + " should be disabled"); + } + + // Get influenced metadata and update the metadata + Map<String, TableDesc> newTables = mapping.keySet().stream() + .map(t -> getTableManager().getTableDesc(t, project)).collect(Collectors.toMap(t -> t.getIdentity(), + t -> TableSchemaUpdater.dealWithMappingForTable(t, mapping))); + Map<String, String> existingTables = newTables.entrySet().stream() + .filter(t -> getTableManager().getTableDesc(t.getValue().getIdentity(), project, false) != null) + .collect(Collectors.toMap(t -> t.getKey(), t -> t.getValue().getIdentity())); + if (!existingTables.isEmpty()) { + if (isUseExisting) { + logger.info("Will use existing tables {}", existingTables.values()); + } else { + throw new BadRequestException("Tables " + existingTables.values() + " already exist"); + } + } + Map<String, DataModelDesc> newModels = prjInstance.getModels().stream() + .map(m -> getDataModelManager().getDataModelDesc(m)).filter(m -> isTablesUsed(m, mapping.keySet())) + .map(m -> TableSchemaUpdater.dealWithMappingForModel(m, mapping)) + .collect(Collectors.toMap(m -> m.getName(), m -> m)); + + Map<String, CubeDesc> newCubeDescs = infCubes.stream() + .map(cube -> TableSchemaUpdater.dealWithMappingForCubeDesc(cube.getDescriptor(), mapping)) + .collect(Collectors.toMap(cube -> cube.getName(), cube -> cube)); + Map<String, CubeInstance> newCubes = infCubes.stream() + .map(cube -> TableSchemaUpdater.dealWithMappingForCube(cube, mapping)) + .collect(Collectors.toMap(cube -> cube.getName(), cube -> cube)); + + // Save the updated metadata + // -- 1. table & table_ext + for (Map.Entry<String, TableDesc> entry : newTables.entrySet()) { + if (existingTables.containsKey(entry.getKey())) { + continue; + } + getTableManager().saveNewTableExtFromOld(entry.getKey(), project, entry.getValue().getIdentity()); + getTableManager().saveSourceTable(entry.getValue(), project); + } + // -- 2. project + Set<String> newTableNames = newTables.values().stream().map(t -> t.getIdentity()).collect(Collectors.toSet()); + getProjectManager().addTableDescToProject(newTableNames.toArray(new String[0]), project); + // -- 3. model + for (Map.Entry<String, DataModelDesc> entry : newModels.entrySet()) { + getDataModelManager().updateDataModelDesc(entry.getValue()); + } + // -- 4. cube_desc & cube instance + for (Map.Entry<String, CubeDesc> entry : newCubeDescs.entrySet()) { + getCubeDescManager().updateCubeDesc(entry.getValue()); + } + for (Map.Entry<String, CubeInstance> entry : newCubes.entrySet()) { + CubeUpdate update = new CubeUpdate(entry.getValue()); + getCubeManager().updateCube(update); + } + + // Delete dirty table metadata + Set<String> oldTables = Sets.newHashSet(newTables.keySet()); + oldTables.removeAll(existingTables.values()); + getProjectManager().removeTableDescFromProject(oldTables.toArray(new String[0]), project); + for (String entry : newTables.keySet()) { + getTableManager().removeTableExt(entry, project); + getTableManager().removeSourceTable(entry, project); + } + } + + private static boolean isTablesUsed(DataModelDesc model, Set<String> tables) { + Set<String> usingTables = model.getAllTables().stream().map(t -> t.getTableIdentity()) + .collect(Collectors.toSet()); + usingTables.retainAll(tables); + return !usingTables.isEmpty(); + } } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/update/TableSchemaUpdateMapping.java b/server-base/src/main/java/org/apache/kylin/rest/service/update/TableSchemaUpdateMapping.java index 62b4a35..7b7f878 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/update/TableSchemaUpdateMapping.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/update/TableSchemaUpdateMapping.java @@ -18,12 +18,13 @@ package org.apache.kylin.rest.service.update; +import java.io.Serializable; import java.util.Locale; import com.google.common.base.Preconditions; import com.google.common.base.Strings; -public class TableSchemaUpdateMapping { +public class TableSchemaUpdateMapping implements Serializable { private String database;