okumin commented on code in PR #5771: URL: https://github.com/apache/hive/pull/5771#discussion_r2051677647
########## standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/client/HiveMetaStoreClientWithHook.java: ########## @@ -0,0 +1,1250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.client; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook; +import org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.HiveMetaHookLoader; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreFilterHook; +import org.apache.hadoop.hive.metastore.PartitionDropOptions; +import org.apache.hadoop.hive.metastore.TableIterable; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.Catalog; +import org.apache.hadoop.hive.metastore.api.CreateTableRequest; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.DropDatabaseRequest; +import org.apache.hadoop.hive.metastore.api.DropPartitionsExpr; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; +import org.apache.hadoop.hive.metastore.api.GetLatestCommittedCompactionInfoRequest; +import org.apache.hadoop.hive.metastore.api.GetLatestCommittedCompactionInfoResponse; +import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsRequest; +import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsResponse; +import org.apache.hadoop.hive.metastore.api.GetPartitionRequest; +import org.apache.hadoop.hive.metastore.api.GetPartitionResponse; +import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest; +import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesResult; +import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthRequest; +import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthResponse; +import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec; +import org.apache.hadoop.hive.metastore.api.GetTableRequest; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionSpec; +import org.apache.hadoop.hive.metastore.api.PartitionValuesRequest; +import org.apache.hadoop.hive.metastore.api.PartitionValuesResponse; +import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest; +import org.apache.hadoop.hive.metastore.api.PartitionsRequest; +import org.apache.hadoop.hive.metastore.api.PartitionsResponse; +import org.apache.hadoop.hive.metastore.api.RequestPartsSpec; +import org.apache.hadoop.hive.metastore.api.SQLCheckConstraint; +import org.apache.hadoop.hive.metastore.api.SQLDefaultConstraint; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; +import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TableMeta; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; +import org.apache.hadoop.hive.metastore.utils.FilterUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE; +import static org.apache.hadoop.hive.metastore.client.ThriftHiveMetaStoreClient.SNAPSHOT_REF; +import static org.apache.hadoop.hive.metastore.client.ThriftHiveMetaStoreClient.TRUNCATE_SKIP_DATA_DELETION; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; + +public class HiveMetaStoreClientWithHook extends NoopHiveMetaStoreClientDelegator implements IMetaStoreClient { + private final Configuration conf; + private final HiveMetaHookLoader hookLoader; + private final MetaStoreFilterHook filterHook; + private final boolean isClientFilterEnabled; + + private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreClientWithHook.class); + + public HiveMetaStoreClientWithHook(Configuration conf, HiveMetaHookLoader hookLoader, + IMetaStoreClient delegate) { + super(delegate); + if (conf == null) { + conf = MetastoreConf.newMetastoreConf(); + this.conf = conf; + } else { + this.conf = new Configuration(conf); + } + + this.hookLoader = hookLoader; + + filterHook = loadFilterHooks(); + isClientFilterEnabled = getIfClientFilterEnabled(); + } + + private MetaStoreFilterHook loadFilterHooks() throws IllegalStateException { + Class<? extends MetaStoreFilterHook> authProviderClass = + MetastoreConf.getClass( + conf, + MetastoreConf.ConfVars.FILTER_HOOK, DefaultMetaStoreFilterHookImpl.class, + MetaStoreFilterHook.class); + String msg = "Unable to create instance of " + authProviderClass.getName() + ": "; + try { + Constructor<? extends MetaStoreFilterHook> constructor = + authProviderClass.getConstructor(Configuration.class); + return constructor.newInstance(conf); + } catch (NoSuchMethodException | SecurityException | IllegalAccessException | InstantiationException | + IllegalArgumentException | InvocationTargetException e) { + throw new IllegalStateException(msg + e.getMessage(), e); + } + } + + private boolean getIfClientFilterEnabled() { + boolean isEnabled = + MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_CLIENT_FILTER_ENABLED); + LOG.info("HMS client filtering is " + (isEnabled ? "enabled." : "disabled.")); + return isEnabled; + } + + private HiveMetaHook getHook(Table tbl) throws MetaException { + if (hookLoader == null) { + return null; + } + return hookLoader.getHook(tbl); + } + + // SG:FIXME for alterTable, createTable, dropDatabase, etc. families + // Option 1. Keep the HMSC style: wrap only one method by hook, + // and the other methods eventually calls the wrapped one + // Option 2. Wrap each delegating call with hook + // Option 3. Implement a wrapper method something like Spark's withXXX methods. + + @Override + public void alter_table(String dbname, String tbl_name, Table new_tbl) throws TException { + alter_table_with_environmentContext(dbname, tbl_name, new_tbl, null); + } + + @Override + public void alter_table(String defaultDatabaseName, String tblName, Table table, + boolean cascade) throws TException { + EnvironmentContext environmentContext = new EnvironmentContext(); + if (cascade) { + environmentContext.putToProperties(StatsSetupConst.CASCADE, StatsSetupConst.TRUE); + } + alter_table_with_environmentContext(defaultDatabaseName, tblName, table, environmentContext); + } + + @Override + public void alter_table_with_environmentContext(String dbname, String tbl_name, Table new_tbl, + EnvironmentContext envContext) throws TException { + HiveMetaHook hook = getHook(new_tbl); + if (hook != null) { + hook.preAlterTable(new_tbl, envContext); + } + boolean success = false; + try { + getDelegate().alter_table_with_environmentContext(dbname, tbl_name, new_tbl, envContext); + if (hook != null) { + hook.commitAlterTable(new_tbl, envContext); + } + success = true; + } finally { + if (!success && (hook != null)) { + hook.rollbackAlterTable(new_tbl, envContext); + } + } + } + + @Override + public void alter_table(String catName, String dbName, String tbl_name, Table new_tbl, + EnvironmentContext envContext, String validWriteIds) throws TException { + HiveMetaHook hook = getHook(new_tbl); + if (hook != null) { + hook.preAlterTable(new_tbl, envContext); + } + boolean success = false; + try { + boolean skipAlter = envContext != null && envContext.getProperties() != null && + Boolean.valueOf(envContext.getProperties().getOrDefault(HiveMetaHook.SKIP_METASTORE_ALTER, "false")); + if (!skipAlter) { + getDelegate().alter_table(catName, dbName, tbl_name, new_tbl, envContext, validWriteIds); + } + + if (hook != null) { + hook.commitAlterTable(new_tbl, envContext); + } + success = true; + } finally { + if (!success && (hook != null)) { + hook.rollbackAlterTable(new_tbl, envContext); + } + } + } + + @Override + public Catalog getCatalog(String catName) throws TException { + Catalog catalog = getDelegate().getCatalog(catName); + return catalog == null ? + null : FilterUtils.filterCatalogIfEnabled(isClientFilterEnabled, filterHook, catalog); + } + + @Override + public List<String> getCatalogs() throws TException { + List<String> catalogs = getDelegate().getCatalogs(); + return catalogs == null ? + null : FilterUtils.filterCatalogNamesIfEnabled(isClientFilterEnabled, filterHook, catalogs); + } + + @Override + public List<Partition> add_partitions(List<Partition> parts, boolean ifNotExists, boolean needResults) + throws TException { + List<Partition> partitions = getDelegate().add_partitions(parts, ifNotExists, needResults); + if (needResults) { + return FilterUtils.filterPartitionsIfEnabled(isClientFilterEnabled, filterHook, partitions); + } else { + return null; + } + } + + @Override + public List<String> getAllDataConnectorNames() throws MetaException, TException { + List<String> connectorNames = getDelegate().getAllDataConnectorNames(); + return FilterUtils.filterDataConnectorsIfEnabled(isClientFilterEnabled, filterHook, connectorNames); + } + + @Override + public void createTable(Table tbl) throws MetaException, NoSuchObjectException, TException { + CreateTableRequest request = new CreateTableRequest(tbl); + createTable(request); + } Review Comment: We may also need `createTable(Table tbl, EnvironmentContext envContext)` ########## standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/client/HiveMetaStoreClientWithHook.java: ########## @@ -0,0 +1,1250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.client; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook; +import org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.HiveMetaHookLoader; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreFilterHook; +import org.apache.hadoop.hive.metastore.PartitionDropOptions; +import org.apache.hadoop.hive.metastore.TableIterable; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.Catalog; +import org.apache.hadoop.hive.metastore.api.CreateTableRequest; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.DropDatabaseRequest; +import org.apache.hadoop.hive.metastore.api.DropPartitionsExpr; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; +import org.apache.hadoop.hive.metastore.api.GetLatestCommittedCompactionInfoRequest; +import org.apache.hadoop.hive.metastore.api.GetLatestCommittedCompactionInfoResponse; +import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsRequest; +import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsResponse; +import org.apache.hadoop.hive.metastore.api.GetPartitionRequest; +import org.apache.hadoop.hive.metastore.api.GetPartitionResponse; +import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest; +import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesResult; +import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthRequest; +import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthResponse; +import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec; +import org.apache.hadoop.hive.metastore.api.GetTableRequest; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionSpec; +import org.apache.hadoop.hive.metastore.api.PartitionValuesRequest; +import org.apache.hadoop.hive.metastore.api.PartitionValuesResponse; +import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest; +import org.apache.hadoop.hive.metastore.api.PartitionsRequest; +import org.apache.hadoop.hive.metastore.api.PartitionsResponse; +import org.apache.hadoop.hive.metastore.api.RequestPartsSpec; +import org.apache.hadoop.hive.metastore.api.SQLCheckConstraint; +import org.apache.hadoop.hive.metastore.api.SQLDefaultConstraint; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; +import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TableMeta; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; +import org.apache.hadoop.hive.metastore.utils.FilterUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE; +import static org.apache.hadoop.hive.metastore.client.ThriftHiveMetaStoreClient.SNAPSHOT_REF; +import static org.apache.hadoop.hive.metastore.client.ThriftHiveMetaStoreClient.TRUNCATE_SKIP_DATA_DELETION; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; + +public class HiveMetaStoreClientWithHook extends NoopHiveMetaStoreClientDelegator implements IMetaStoreClient { + private final Configuration conf; + private final HiveMetaHookLoader hookLoader; + private final MetaStoreFilterHook filterHook; + private final boolean isClientFilterEnabled; + + private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreClientWithHook.class); + + public HiveMetaStoreClientWithHook(Configuration conf, HiveMetaHookLoader hookLoader, + IMetaStoreClient delegate) { + super(delegate); + if (conf == null) { + conf = MetastoreConf.newMetastoreConf(); + this.conf = conf; + } else { + this.conf = new Configuration(conf); + } + + this.hookLoader = hookLoader; + + filterHook = loadFilterHooks(); + isClientFilterEnabled = getIfClientFilterEnabled(); + } + + private MetaStoreFilterHook loadFilterHooks() throws IllegalStateException { + Class<? extends MetaStoreFilterHook> authProviderClass = + MetastoreConf.getClass( + conf, + MetastoreConf.ConfVars.FILTER_HOOK, DefaultMetaStoreFilterHookImpl.class, + MetaStoreFilterHook.class); + String msg = "Unable to create instance of " + authProviderClass.getName() + ": "; + try { + Constructor<? extends MetaStoreFilterHook> constructor = + authProviderClass.getConstructor(Configuration.class); + return constructor.newInstance(conf); + } catch (NoSuchMethodException | SecurityException | IllegalAccessException | InstantiationException | + IllegalArgumentException | InvocationTargetException e) { + throw new IllegalStateException(msg + e.getMessage(), e); + } + } + + private boolean getIfClientFilterEnabled() { + boolean isEnabled = + MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_CLIENT_FILTER_ENABLED); + LOG.info("HMS client filtering is " + (isEnabled ? "enabled." : "disabled.")); + return isEnabled; + } + + private HiveMetaHook getHook(Table tbl) throws MetaException { + if (hookLoader == null) { + return null; + } + return hookLoader.getHook(tbl); + } + + // SG:FIXME for alterTable, createTable, dropDatabase, etc. families + // Option 1. Keep the HMSC style: wrap only one method by hook, + // and the other methods eventually calls the wrapped one + // Option 2. Wrap each delegating call with hook + // Option 3. Implement a wrapper method something like Spark's withXXX methods. + + @Override + public void alter_table(String dbname, String tbl_name, Table new_tbl) throws TException { + alter_table_with_environmentContext(dbname, tbl_name, new_tbl, null); + } + + @Override + public void alter_table(String defaultDatabaseName, String tblName, Table table, + boolean cascade) throws TException { + EnvironmentContext environmentContext = new EnvironmentContext(); + if (cascade) { + environmentContext.putToProperties(StatsSetupConst.CASCADE, StatsSetupConst.TRUE); + } + alter_table_with_environmentContext(defaultDatabaseName, tblName, table, environmentContext); + } + + @Override + public void alter_table_with_environmentContext(String dbname, String tbl_name, Table new_tbl, + EnvironmentContext envContext) throws TException { + HiveMetaHook hook = getHook(new_tbl); + if (hook != null) { + hook.preAlterTable(new_tbl, envContext); + } + boolean success = false; + try { + getDelegate().alter_table_with_environmentContext(dbname, tbl_name, new_tbl, envContext); + if (hook != null) { + hook.commitAlterTable(new_tbl, envContext); + } + success = true; + } finally { + if (!success && (hook != null)) { + hook.rollbackAlterTable(new_tbl, envContext); + } + } + } + + @Override + public void alter_table(String catName, String dbName, String tbl_name, Table new_tbl, + EnvironmentContext envContext, String validWriteIds) throws TException { + HiveMetaHook hook = getHook(new_tbl); + if (hook != null) { + hook.preAlterTable(new_tbl, envContext); + } + boolean success = false; + try { + boolean skipAlter = envContext != null && envContext.getProperties() != null && + Boolean.valueOf(envContext.getProperties().getOrDefault(HiveMetaHook.SKIP_METASTORE_ALTER, "false")); + if (!skipAlter) { + getDelegate().alter_table(catName, dbName, tbl_name, new_tbl, envContext, validWriteIds); + } + + if (hook != null) { + hook.commitAlterTable(new_tbl, envContext); + } + success = true; + } finally { + if (!success && (hook != null)) { + hook.rollbackAlterTable(new_tbl, envContext); + } + } + } + + @Override + public Catalog getCatalog(String catName) throws TException { + Catalog catalog = getDelegate().getCatalog(catName); + return catalog == null ? + null : FilterUtils.filterCatalogIfEnabled(isClientFilterEnabled, filterHook, catalog); + } + + @Override + public List<String> getCatalogs() throws TException { + List<String> catalogs = getDelegate().getCatalogs(); + return catalogs == null ? + null : FilterUtils.filterCatalogNamesIfEnabled(isClientFilterEnabled, filterHook, catalogs); + } + + @Override + public List<Partition> add_partitions(List<Partition> parts, boolean ifNotExists, boolean needResults) + throws TException { + List<Partition> partitions = getDelegate().add_partitions(parts, ifNotExists, needResults); + if (needResults) { + return FilterUtils.filterPartitionsIfEnabled(isClientFilterEnabled, filterHook, partitions); + } else { + return null; + } + } + + @Override + public List<String> getAllDataConnectorNames() throws MetaException, TException { + List<String> connectorNames = getDelegate().getAllDataConnectorNames(); + return FilterUtils.filterDataConnectorsIfEnabled(isClientFilterEnabled, filterHook, connectorNames); + } + + @Override + public void createTable(Table tbl) throws MetaException, NoSuchObjectException, TException { + CreateTableRequest request = new CreateTableRequest(tbl); + createTable(request); + } + + @Override + public void createTable(CreateTableRequest request) + throws MetaException, NoSuchObjectException, TException { + Table tbl = request.getTable(); + HiveMetaHook hook = getHook(tbl); + if (hook != null) { + hook.preCreateTable(request); + } + boolean success = false; + try { + // Subclasses can override this step (for example, for temporary tables) + if (hook == null || !hook.createHMSTableInHook()) { + getDelegate().createTable(request); + } + if (hook != null) { + hook.commitCreateTable(tbl); + } + success = true; + } finally { + if (!success && (hook != null)) { + try { + hook.rollbackCreateTable(tbl); + } catch (Exception e) { + LOG.error("Create rollback failed with", e); + } + } + } + } + + @Override + public void createTableWithConstraints(Table tbl, + List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys, + List<SQLUniqueConstraint> uniqueConstraints, + List<SQLNotNullConstraint> notNullConstraints, + List<SQLDefaultConstraint> defaultConstraints, + List<SQLCheckConstraint> checkConstraints) throws TException { + + CreateTableRequest createTableRequest = new CreateTableRequest(tbl); + + if (!tbl.isSetCatName()) { + String defaultCat = getDefaultCatalog(conf); + tbl.setCatName(defaultCat); + if (primaryKeys != null) { + primaryKeys.forEach(pk -> pk.setCatName(defaultCat)); + } + if (foreignKeys != null) { + foreignKeys.forEach(fk -> fk.setCatName(defaultCat)); + } + if (uniqueConstraints != null) { + uniqueConstraints.forEach(uc -> uc.setCatName(defaultCat)); + createTableRequest.setUniqueConstraints(uniqueConstraints); + } + if (notNullConstraints != null) { + notNullConstraints.forEach(nn -> nn.setCatName(defaultCat)); + } + if (defaultConstraints != null) { + defaultConstraints.forEach(def -> def.setCatName(defaultCat)); + } + if (checkConstraints != null) { + checkConstraints.forEach(cc -> cc.setCatName(defaultCat)); + } + } + + if (primaryKeys != null) + createTableRequest.setPrimaryKeys(primaryKeys); + + if (foreignKeys != null) + createTableRequest.setForeignKeys(foreignKeys); + + if (uniqueConstraints != null) + createTableRequest.setUniqueConstraints(uniqueConstraints); + + if (notNullConstraints != null) + createTableRequest.setNotNullConstraints(notNullConstraints); + + if (defaultConstraints != null) + createTableRequest.setDefaultConstraints(defaultConstraints); + + if (checkConstraints != null) + createTableRequest.setCheckConstraints(checkConstraints); + + createTable(createTableRequest); + } + + @Override + public void dropDatabase(String name) + throws NoSuchObjectException, InvalidOperationException, MetaException, TException { + dropDatabase(getDefaultCatalog(conf), name, true, false, false); + } + + @Override + public void dropDatabase(String name, boolean deleteData, boolean ignoreUnknownDb) throws TException { + dropDatabase(getDefaultCatalog(conf), name, deleteData, ignoreUnknownDb, false); + } + + @Override + public void dropDatabase(String name, boolean deleteData, boolean ignoreUnknownDb, boolean cascade) + throws TException { + dropDatabase(getDefaultCatalog(conf), name, deleteData, ignoreUnknownDb, cascade); + } + + @Override + public void dropDatabase(DropDatabaseRequest req) throws TException { + if (req.isCascade()) { + // Note that this logic may drop some of the tables of the database + // even if the drop database fail for any reason + // TODO: Fix this + List<String> materializedViews = getDelegate().getTables(req.getName(), ".*", TableType.MATERIALIZED_VIEW); + for (String table : materializedViews) { + // First we delete the materialized views + Table materializedView = getDelegate().getTable(getDefaultCatalog(conf), req.getName(), table); + boolean isSoftDelete = req.isSoftDelete() && Boolean.parseBoolean( + materializedView.getParameters().get(SOFT_DELETE_TABLE)); + materializedView.setTxnId(req.getTxnId()); + getDelegate().dropTable(materializedView, req.isDeleteData() && !isSoftDelete, true, false); + } + + /** + * When dropping db cascade, client side hooks have to be called at each table removal. + * If {@link org.apache.hadoop.hive.metastore.conf.MetastoreConf#ConfVars.BATCH_RETRIEVE_MAX + * BATCH_RETRIEVE_MAX} is less than the number of tables in the DB, we'll have to call the + * hooks one by one each alongside with a + * {@link #dropTable(String, String, boolean, boolean, EnvironmentContext) dropTable} call to + * ensure transactionality. + */ + List<String> tableNameList = getDelegate().getAllTables(req.getName()); + int tableCount = tableNameList.size(); + int maxBatchSize = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX); + LOG.debug("Selecting dropDatabase method for " + req.getName() + " (" + tableCount + " tables), " + + MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX.getVarname() + "=" + maxBatchSize); + + if (tableCount > maxBatchSize) { + LOG.debug("Dropping database in a per table batch manner."); + dropDatabaseCascadePerTable(req, tableNameList, maxBatchSize); + } else { + LOG.debug("Dropping database in a per DB manner."); + dropDatabaseCascadePerDb(req, tableNameList); + } + + } else { + getDelegate().dropDatabase(req); + } + } + + /** + * Handles dropDatabase by invoking drop_table in HMS for each table. + * Useful when table list in DB is too large to fit in memory. It will retrieve tables in + * chunks and for each table with a drop_table hook it will invoke drop_table on both HMS and + * the hook. This is a timely operation so hookless tables are skipped and will be dropped on + * server side when the client invokes drop_database. + * Note that this is 'less transactional' than dropDatabaseCascadePerDb since we're dropping + * table level objects, so the overall outcome of this method might result in a halfly dropped DB. + * @param tableList + * @param maxBatchSize + * @throws TException + */ + private void dropDatabaseCascadePerTable(DropDatabaseRequest req, List<String> tableList, int maxBatchSize) + throws TException { + boolean ifPurge = false; + boolean ignoreUnknownTable = false; + + for (Table table: new TableIterable(this, req.getCatalogName(), req.getName(), tableList, maxBatchSize)) { + boolean isSoftDelete = + req.isSoftDelete() && Boolean.parseBoolean(table.getParameters().get(SOFT_DELETE_TABLE)); + boolean deleteData = req.isDeleteData() && !isSoftDelete; + getDelegate().dropTable(req.getCatalogName(), req.getName(), table.getTableName(), deleteData, ifPurge, + ignoreUnknownTable); + // SG:FIXME, introduce IMetaStoreClient.dropTable :: DropTableRequest -> void + // We need a new method that takes catName and checks tbl.isSetTxnId() + // Or does TxN stuff meaningful only if we are in default catalog? Review Comment: This needs to invoke the hook. ########## standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/client/HiveMetaStoreClientWithHook.java: ########## @@ -0,0 +1,1250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.client; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook; +import org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.HiveMetaHookLoader; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreFilterHook; +import org.apache.hadoop.hive.metastore.PartitionDropOptions; +import org.apache.hadoop.hive.metastore.TableIterable; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.Catalog; +import org.apache.hadoop.hive.metastore.api.CreateTableRequest; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.DropDatabaseRequest; +import org.apache.hadoop.hive.metastore.api.DropPartitionsExpr; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; +import org.apache.hadoop.hive.metastore.api.GetLatestCommittedCompactionInfoRequest; +import org.apache.hadoop.hive.metastore.api.GetLatestCommittedCompactionInfoResponse; +import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsRequest; +import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsResponse; +import org.apache.hadoop.hive.metastore.api.GetPartitionRequest; +import org.apache.hadoop.hive.metastore.api.GetPartitionResponse; +import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest; +import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesResult; +import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthRequest; +import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthResponse; +import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec; +import org.apache.hadoop.hive.metastore.api.GetTableRequest; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionSpec; +import org.apache.hadoop.hive.metastore.api.PartitionValuesRequest; +import org.apache.hadoop.hive.metastore.api.PartitionValuesResponse; +import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest; +import org.apache.hadoop.hive.metastore.api.PartitionsRequest; +import org.apache.hadoop.hive.metastore.api.PartitionsResponse; +import org.apache.hadoop.hive.metastore.api.RequestPartsSpec; +import org.apache.hadoop.hive.metastore.api.SQLCheckConstraint; +import org.apache.hadoop.hive.metastore.api.SQLDefaultConstraint; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; +import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TableMeta; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; +import org.apache.hadoop.hive.metastore.utils.FilterUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE; +import static org.apache.hadoop.hive.metastore.client.ThriftHiveMetaStoreClient.SNAPSHOT_REF; +import static org.apache.hadoop.hive.metastore.client.ThriftHiveMetaStoreClient.TRUNCATE_SKIP_DATA_DELETION; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; + +public class HiveMetaStoreClientWithHook extends NoopHiveMetaStoreClientDelegator implements IMetaStoreClient { + private final Configuration conf; + private final HiveMetaHookLoader hookLoader; + private final MetaStoreFilterHook filterHook; + private final boolean isClientFilterEnabled; + + private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreClientWithHook.class); + + public HiveMetaStoreClientWithHook(Configuration conf, HiveMetaHookLoader hookLoader, + IMetaStoreClient delegate) { + super(delegate); + if (conf == null) { + conf = MetastoreConf.newMetastoreConf(); + this.conf = conf; + } else { + this.conf = new Configuration(conf); + } + + this.hookLoader = hookLoader; + + filterHook = loadFilterHooks(); + isClientFilterEnabled = getIfClientFilterEnabled(); + } + + private MetaStoreFilterHook loadFilterHooks() throws IllegalStateException { + Class<? extends MetaStoreFilterHook> authProviderClass = + MetastoreConf.getClass( + conf, + MetastoreConf.ConfVars.FILTER_HOOK, DefaultMetaStoreFilterHookImpl.class, + MetaStoreFilterHook.class); + String msg = "Unable to create instance of " + authProviderClass.getName() + ": "; + try { + Constructor<? extends MetaStoreFilterHook> constructor = + authProviderClass.getConstructor(Configuration.class); + return constructor.newInstance(conf); + } catch (NoSuchMethodException | SecurityException | IllegalAccessException | InstantiationException | + IllegalArgumentException | InvocationTargetException e) { + throw new IllegalStateException(msg + e.getMessage(), e); + } + } + + private boolean getIfClientFilterEnabled() { + boolean isEnabled = + MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_CLIENT_FILTER_ENABLED); + LOG.info("HMS client filtering is " + (isEnabled ? "enabled." : "disabled.")); + return isEnabled; + } + + private HiveMetaHook getHook(Table tbl) throws MetaException { + if (hookLoader == null) { + return null; + } + return hookLoader.getHook(tbl); + } + + // SG:FIXME for alterTable, createTable, dropDatabase, etc. families + // Option 1. Keep the HMSC style: wrap only one method by hook, + // and the other methods eventually calls the wrapped one + // Option 2. Wrap each delegating call with hook + // Option 3. Implement a wrapper method something like Spark's withXXX methods. + + @Override + public void alter_table(String dbname, String tbl_name, Table new_tbl) throws TException { + alter_table_with_environmentContext(dbname, tbl_name, new_tbl, null); + } + + @Override + public void alter_table(String defaultDatabaseName, String tblName, Table table, + boolean cascade) throws TException { + EnvironmentContext environmentContext = new EnvironmentContext(); + if (cascade) { + environmentContext.putToProperties(StatsSetupConst.CASCADE, StatsSetupConst.TRUE); + } + alter_table_with_environmentContext(defaultDatabaseName, tblName, table, environmentContext); + } + + @Override + public void alter_table_with_environmentContext(String dbname, String tbl_name, Table new_tbl, + EnvironmentContext envContext) throws TException { + HiveMetaHook hook = getHook(new_tbl); + if (hook != null) { + hook.preAlterTable(new_tbl, envContext); + } + boolean success = false; + try { + getDelegate().alter_table_with_environmentContext(dbname, tbl_name, new_tbl, envContext); + if (hook != null) { + hook.commitAlterTable(new_tbl, envContext); + } + success = true; + } finally { + if (!success && (hook != null)) { + hook.rollbackAlterTable(new_tbl, envContext); + } + } + } + + @Override + public void alter_table(String catName, String dbName, String tbl_name, Table new_tbl, + EnvironmentContext envContext, String validWriteIds) throws TException { + HiveMetaHook hook = getHook(new_tbl); + if (hook != null) { + hook.preAlterTable(new_tbl, envContext); + } + boolean success = false; + try { + boolean skipAlter = envContext != null && envContext.getProperties() != null && + Boolean.valueOf(envContext.getProperties().getOrDefault(HiveMetaHook.SKIP_METASTORE_ALTER, "false")); + if (!skipAlter) { + getDelegate().alter_table(catName, dbName, tbl_name, new_tbl, envContext, validWriteIds); + } + + if (hook != null) { + hook.commitAlterTable(new_tbl, envContext); + } + success = true; + } finally { + if (!success && (hook != null)) { + hook.rollbackAlterTable(new_tbl, envContext); + } + } + } + + @Override + public Catalog getCatalog(String catName) throws TException { + Catalog catalog = getDelegate().getCatalog(catName); + return catalog == null ? + null : FilterUtils.filterCatalogIfEnabled(isClientFilterEnabled, filterHook, catalog); + } + + @Override + public List<String> getCatalogs() throws TException { + List<String> catalogs = getDelegate().getCatalogs(); + return catalogs == null ? + null : FilterUtils.filterCatalogNamesIfEnabled(isClientFilterEnabled, filterHook, catalogs); + } + + @Override + public List<Partition> add_partitions(List<Partition> parts, boolean ifNotExists, boolean needResults) + throws TException { + List<Partition> partitions = getDelegate().add_partitions(parts, ifNotExists, needResults); + if (needResults) { + return FilterUtils.filterPartitionsIfEnabled(isClientFilterEnabled, filterHook, partitions); + } else { + return null; + } + } + + @Override + public List<String> getAllDataConnectorNames() throws MetaException, TException { + List<String> connectorNames = getDelegate().getAllDataConnectorNames(); + return FilterUtils.filterDataConnectorsIfEnabled(isClientFilterEnabled, filterHook, connectorNames); + } + + @Override + public void createTable(Table tbl) throws MetaException, NoSuchObjectException, TException { + CreateTableRequest request = new CreateTableRequest(tbl); + createTable(request); + } + + @Override + public void createTable(CreateTableRequest request) + throws MetaException, NoSuchObjectException, TException { + Table tbl = request.getTable(); Review Comment: It might be more consistent if we set the default catalog name when it's empty -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For additional commands, e-mail: gitbox-h...@hive.apache.org