This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a3cf3f1  [FLINK-12232][hive] Support database related operations in 
HiveCatalog
a3cf3f1 is described below

commit a3cf3f1fc69e9ed17433cbc522c7086cead7790e
Author: bowen.li <bowenl...@gmail.com>
AuthorDate: Thu May 2 21:05:04 2019 -0700

    [FLINK-12232][hive] Support database related operations in HiveCatalog
    
    This PR creates HiveCatalog in flink-connector-hive module and implements 
database related APIs for HiveCatalog.
    
    This closes #8837
---
 .../catalog/hive/GenericHiveMetastoreCatalog.java  | 162 ++--------------
 .../flink/table/catalog/hive/HiveCatalog.java      | 210 +++++++++++++++++++++
 .../flink/table/catalog/hive/HiveCatalogBase.java  | 200 ++++++++++++++++++++
 .../table/catalog/hive/HiveCatalogDatabase.java    |  51 +++--
 .../flink/table/catalog/hive/HiveCatalogUtil.java  |  45 ++---
 .../hive/GenericHiveMetastoreCatalogTest.java      |   2 +-
 .../flink/table/catalog/hive/HiveCatalogTest.java  | 179 ++++++++++++++++++
 .../table/catalog/GenericCatalogDatabase.java      |  20 +-
 .../flink/table/catalog/CatalogDatabase.java       |   7 +
 .../flink/table/catalog/CatalogTestBase.java       |   5 +
 10 files changed, 679 insertions(+), 202 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
index 50ed2e9..bb431cc 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
@@ -25,10 +25,8 @@ import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.GenericCatalogDatabase;
 import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.ReadableWritableCatalog;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
@@ -38,16 +36,10 @@ import 
org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
-import org.apache.flink.util.StringUtils;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
-import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
@@ -57,173 +49,43 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * A catalog that persists all Flink streaming and batch metadata by using 
Hive metastore as a persistent storage.
  */
-public class GenericHiveMetastoreCatalog implements ReadableWritableCatalog {
+public class GenericHiveMetastoreCatalog extends HiveCatalogBase {
        private static final Logger LOG = 
LoggerFactory.getLogger(GenericHiveMetastoreCatalog.class);
 
-       public static final String DEFAULT_DB = "default";
-
-       private final String catalogName;
-       private final HiveConf hiveConf;
-
-       private String currentDatabase = DEFAULT_DB;
-       private IMetaStoreClient client;
-
        public GenericHiveMetastoreCatalog(String catalogName, String 
hivemetastoreURI) {
-               this(catalogName, getHiveConf(hivemetastoreURI));
-       }
+               super(catalogName, hivemetastoreURI);
 
-       public GenericHiveMetastoreCatalog(String catalogName, HiveConf 
hiveConf) {
-               checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"catalogName cannot be null or empty");
-               this.catalogName = catalogName;
-
-               this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
                LOG.info("Created GenericHiveMetastoreCatalog '{}'", 
catalogName);
        }
 
-       private static HiveConf getHiveConf(String hiveMetastoreURI) {
-               
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), 
"hiveMetastoreURI cannot be null or empty");
-
-               HiveConf hiveConf = new HiveConf();
-               hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, 
hiveMetastoreURI);
-               return hiveConf;
-       }
-
-       private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
-               try {
-                       return RetryingMetaStoreClient.getProxy(
-                               hiveConf,
-                               null,
-                               null,
-                               HiveMetaStoreClient.class.getName(),
-                               true);
-               } catch (MetaException e) {
-                       throw new CatalogException("Failed to create Hive 
metastore client", e);
-               }
-       }
-
-       @Override
-       public void open() throws CatalogException {
-               if (client == null) {
-                       client = getMetastoreClient(hiveConf);
-                       LOG.info("Connected to Hive metastore");
-               }
-       }
+       public GenericHiveMetastoreCatalog(String catalogName, HiveConf 
hiveConf) {
+               super(catalogName, hiveConf);
 
-       @Override
-       public void close() throws CatalogException {
-               if (client != null) {
-                       client.close();
-                       client = null;
-                       LOG.info("Close connection to Hive metastore");
-               }
+               LOG.info("Created GenericHiveMetastoreCatalog '{}'", 
catalogName);
        }
 
        // ------ databases ------
 
        @Override
-       public String getCurrentDatabase() throws CatalogException {
-               return currentDatabase;
-       }
-
-       @Override
-       public void setCurrentDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
-               
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
-
-               if (!databaseExists(databaseName)) {
-                       throw new DatabaseNotExistException(catalogName, 
databaseName);
-               }
-
-               currentDatabase = databaseName;
-       }
-
-       @Override
-       public List<String> listDatabases() throws CatalogException {
-               try {
-                       return client.getAllDatabases();
-               } catch (TException e) {
-                       throw new CatalogException(
-                               String.format("Failed to list all databases in 
%s", catalogName), e);
-               }
-       }
-
-       @Override
        public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
-               Database hiveDb;
-
-               try {
-                       hiveDb = client.getDatabase(databaseName);
-               } catch (NoSuchObjectException e) {
-                       throw new DatabaseNotExistException(catalogName, 
databaseName);
-               } catch (TException e) {
-                       throw new CatalogException(
-                               String.format("Failed to get database %s from 
%s", databaseName, catalogName), e);
-               }
+               Database hiveDb = getHiveDatabase(databaseName);
 
                return new GenericCatalogDatabase(hiveDb.getParameters(), 
hiveDb.getDescription());
        }
 
        @Override
-       public boolean databaseExists(String databaseName) throws 
CatalogException {
-               try {
-                       return client.getDatabase(databaseName) != null;
-               } catch (NoSuchObjectException e) {
-                       return false;
-               } catch (TException e) {
-                       throw new CatalogException(
-                               String.format("Failed to determine whether 
database %s exists or not", databaseName), e);
-               }
+       public void createDatabase(String name, CatalogDatabase database, 
boolean ignoreIfExists)
+                       throws DatabaseAlreadyExistException, CatalogException {
+               
createHiveDatabase(GenericHiveMetastoreCatalogUtil.createHiveDatabase(name, 
database), ignoreIfExists);
        }
 
        @Override
-       public void createDatabase(String name, CatalogDatabase database, 
boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException {
-
-               try {
-                       
client.createDatabase(GenericHiveMetastoreCatalogUtil.createHiveDatabase(name, 
database));
-               } catch (AlreadyExistsException e) {
-                       if (!ignoreIfExists) {
-                               throw new 
DatabaseAlreadyExistException(catalogName, name);
-                       }
-               } catch (TException e) {
-                       throw new CatalogException(String.format("Failed to 
create database %s", name), e);
-               }
-       }
-
-       @Override
-       public void dropDatabase(String name, boolean ignoreIfNotExists) throws 
DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
-               try {
-                       client.dropDatabase(name, true, ignoreIfNotExists);
-               } catch (NoSuchObjectException e) {
-                       if (!ignoreIfNotExists) {
-                               throw new 
DatabaseNotExistException(catalogName, name);
-                       }
-               } catch (InvalidOperationException e) {
-                       if (e.getMessage().startsWith(String.format("Database 
%s is not empty", name))) {
-                               throw new 
DatabaseNotEmptyException(catalogName, name);
-                       } else {
-                               throw new 
CatalogException(String.format("Failed to drop database %s", name), e);
-                       }
-               } catch (TException e) {
-                       throw new CatalogException(String.format("Failed to 
drop database %s", name), e);
-               }
-       }
-
-       @Override
-       public void alterDatabase(String name, CatalogDatabase newDatabase, 
boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException {
-               try {
-                       if (databaseExists(name)) {
-                               client.alterDatabase(name, 
GenericHiveMetastoreCatalogUtil.createHiveDatabase(name, newDatabase));
-                       } else if (!ignoreIfNotExists) {
-                               throw new 
DatabaseNotExistException(catalogName, name);
-                       }
-               } catch (TException e) {
-                       throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
-               }
+       public void alterDatabase(String name, CatalogDatabase newDatabase, 
boolean ignoreIfNotExists)
+                       throws DatabaseNotExistException, CatalogException {
+               alterHiveDatabase(name, 
GenericHiveMetastoreCatalogUtil.createHiveDatabase(name, newDatabase), 
ignoreIfNotExists);
        }
 
        // ------ tables and views------
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
new file mode 100644
index 0000000..7f96852
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * A catalog implementation for Hive.
+ */
+public class HiveCatalog extends HiveCatalogBase {
+       private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalog.class);
+
+       public HiveCatalog(String catalogName, String hivemetastoreURI) {
+               super(catalogName, hivemetastoreURI);
+
+               LOG.info("Created HiveCatalog '{}'", catalogName);
+       }
+
+       public HiveCatalog(String catalogName, HiveConf hiveConf) {
+               super(catalogName, hiveConf);
+
+               LOG.info("Created HiveCatalog '{}'", catalogName);
+       }
+
+       // ------ databases ------
+
+       @Override
+       public CatalogDatabase getDatabase(String databaseName)
+                       throws DatabaseNotExistException, CatalogException {
+               Database hiveDb = getHiveDatabase(databaseName);
+
+               return new HiveCatalogDatabase(
+                       hiveDb.getParameters(), hiveDb.getLocationUri(), 
hiveDb.getDescription());
+       }
+
+       @Override
+       public void createDatabase(String name, CatalogDatabase database, 
boolean ignoreIfExists)
+                       throws DatabaseAlreadyExistException, CatalogException {
+               createHiveDatabase(HiveCatalogUtil.createHiveDatabase(name, 
database), ignoreIfExists);
+       }
+
+       @Override
+       public void alterDatabase(String name, CatalogDatabase newDatabase, 
boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException {
+               alterHiveDatabase(name, 
HiveCatalogUtil.createHiveDatabase(name, newDatabase), ignoreIfNotExists);
+       }
+
+       // ------ tables and views------
+
+       @Override
+       public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+                       throws TableNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+                       throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+                       throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void alterTable(ObjectPath tableName, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
+                       throws TableNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public List<String> listTables(String databaseName)
+                       throws DatabaseNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public List<String> listViews(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CatalogBaseTable getTable(ObjectPath objectPath) throws 
TableNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public boolean tableExists(ObjectPath objectPath) throws 
CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       // ------ partitions ------
+
+       @Override
+       public void createPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
+                       throws TableNotExistException, 
TableNotPartitionedException, PartitionSpecInvalidException, 
PartitionAlreadyExistsException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, boolean ignoreIfNotExists)
+                       throws PartitionNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
+                       throws PartitionNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+                       throws TableNotExistException, 
TableNotPartitionedException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+                       throws TableNotExistException, 
TableNotPartitionedException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CatalogPartition getPartition(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+                       throws PartitionNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public boolean partitionExists(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec) throws CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       // ------ functions ------
+
+       @Override
+       public void createFunction(ObjectPath functionPath, CatalogFunction 
function, boolean ignoreIfExists)
+                       throws FunctionAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void alterFunction(ObjectPath functionPath, CatalogFunction 
newFunction, boolean ignoreIfNotExists)
+                       throws FunctionNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void dropFunction(ObjectPath functionPath, boolean 
ignoreIfNotExists)
+                       throws FunctionNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public List<String> listFunctions(String dbName) throws 
DatabaseNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CatalogFunction getFunction(ObjectPath functionPath) throws 
FunctionNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public boolean functionExists(ObjectPath functionPath) throws 
CatalogException {
+               throw new UnsupportedOperationException();
+       }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
new file mode 100644
index 0000000..b044de3
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.ReadableWritableCatalog;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for catalogs backed by Hive metastore.
+ */
+public abstract class HiveCatalogBase implements ReadableWritableCatalog {
+       private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalogBase.class);
+
+       public static final String DEFAULT_DB = "default";
+
+       protected final String catalogName;
+       protected final HiveConf hiveConf;
+
+       protected String currentDatabase = DEFAULT_DB;
+       protected IMetaStoreClient client;
+
+       public HiveCatalogBase(String catalogName, String hivemetastoreURI) {
+               this(catalogName, getHiveConf(hivemetastoreURI));
+       }
+
+       public HiveCatalogBase(String catalogName, HiveConf hiveConf) {
+               checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"catalogName cannot be null or empty");
+               this.catalogName = catalogName;
+
+               this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
+       }
+
+       private static HiveConf getHiveConf(String hiveMetastoreURI) {
+               
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), 
"hiveMetastoreURI cannot be null or empty");
+
+               HiveConf hiveConf = new HiveConf();
+               hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, 
hiveMetastoreURI);
+               return hiveConf;
+       }
+
+       private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+               try {
+                       return RetryingMetaStoreClient.getProxy(
+                               hiveConf,
+                               null,
+                               null,
+                               HiveMetaStoreClient.class.getName(),
+                               true);
+               } catch (MetaException e) {
+                       throw new CatalogException("Failed to create Hive 
metastore client", e);
+               }
+       }
+
+       @Override
+       public void open() throws CatalogException {
+               if (client == null) {
+                       client = getMetastoreClient(hiveConf);
+                       LOG.info("Connected to Hive metastore");
+               }
+       }
+
+       @Override
+       public void close() throws CatalogException {
+               if (client != null) {
+                       client.close();
+                       client = null;
+                       LOG.info("Close connection to Hive metastore");
+               }
+       }
+
+       // ------ databases ------
+
+       @Override
+       public String getCurrentDatabase() throws CatalogException {
+               return currentDatabase;
+       }
+
+       @Override
+       public void setCurrentDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+               
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+               if (!databaseExists(databaseName)) {
+                       throw new DatabaseNotExistException(catalogName, 
databaseName);
+               }
+
+               currentDatabase = databaseName;
+       }
+
+       @Override
+       public List<String> listDatabases() throws CatalogException {
+               try {
+                       return client.getAllDatabases();
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to list all databases in 
%s", catalogName), e);
+               }
+       }
+
+       @Override
+       public boolean databaseExists(String databaseName) throws 
CatalogException {
+               try {
+                       return client.getDatabase(databaseName) != null;
+               } catch (NoSuchObjectException e) {
+                       return false;
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to determine whether 
database %s exists or not", databaseName), e);
+               }
+       }
+
+       @Override
+       public void dropDatabase(String name, boolean ignoreIfNotExists) throws 
DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
+               try {
+                       client.dropDatabase(name, true, ignoreIfNotExists);
+               } catch (NoSuchObjectException e) {
+                       if (!ignoreIfNotExists) {
+                               throw new 
DatabaseNotExistException(catalogName, name);
+                       }
+               } catch (InvalidOperationException e) {
+                       throw new DatabaseNotEmptyException(catalogName, name);
+               } catch (TException e) {
+                       throw new CatalogException(String.format("Failed to 
drop database %s", name), e);
+               }
+       }
+
+       protected Database getHiveDatabase(String databaseName) throws 
DatabaseNotExistException {
+               try {
+                       return client.getDatabase(databaseName);
+               } catch (NoSuchObjectException e) {
+                       throw new DatabaseNotExistException(catalogName, 
databaseName);
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to get database %s from 
%s", databaseName, catalogName), e);
+               }
+       }
+
+       protected void createHiveDatabase(Database hiveDatabase, boolean 
ignoreIfExists)
+                       throws DatabaseAlreadyExistException, CatalogException {
+               try {
+                       client.createDatabase(hiveDatabase);
+               } catch (AlreadyExistsException e) {
+                       if (!ignoreIfExists) {
+                               throw new 
DatabaseAlreadyExistException(catalogName, hiveDatabase.getName());
+                       }
+               } catch (TException e) {
+                       throw new CatalogException(String.format("Failed to 
create database %s", hiveDatabase.getName()), e);
+               }
+       }
+
+       protected void alterHiveDatabase(String name, Database newHiveDatabase, 
boolean ignoreIfNotExists)
+                       throws DatabaseNotExistException, CatalogException {
+               try {
+                       if (databaseExists(name)) {
+                               client.alterDatabase(name, newHiveDatabase);
+                       } else if (!ignoreIfNotExists) {
+                               throw new 
DatabaseNotExistException(catalogName, name);
+                       }
+               } catch (TException e) {
+                       throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogDatabase.java
similarity index 55%
copy from 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
copy to 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogDatabase.java
index 959247a..a7cf5c2 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogDatabase.java
@@ -16,38 +16,62 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.catalog;
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.util.StringUtils;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A generic catalog database implementation.
+ * A hive catalog database implementation.
  */
-public class GenericCatalogDatabase implements CatalogDatabase {
+public class HiveCatalogDatabase implements CatalogDatabase {
+       // Property of the database
        private final Map<String, String> properties;
+       // HDFS path of the database
+       private String location;
        // Comment of the database
-       private String comment = "This is a generic catalog database.";
+       private String comment = "This is a hive catalog database.";
+
+       public HiveCatalogDatabase() {
+               properties = new HashMap<>();
+       }
 
-       public GenericCatalogDatabase(Map<String, String> properties) {
+       public HiveCatalogDatabase(Map<String, String> properties) {
                this.properties = checkNotNull(properties, "properties cannot 
be null");
        }
 
-       public GenericCatalogDatabase(Map<String, String> properties, String 
comment) {
+       public HiveCatalogDatabase(Map<String, String> properties, String 
comment) {
                this(properties);
                this.comment = checkNotNull(comment, "comment cannot be null");
        }
 
+       public HiveCatalogDatabase(Map<String, String> properties, String 
location, String comment) {
+               this(properties, comment);
+
+               checkArgument(!StringUtils.isNullOrWhitespaceOnly(location), 
"location cannot be null or empty");
+               this.location = location;
+       }
+
+       @Override
        public Map<String, String> getProperties() {
                return properties;
        }
 
        @Override
-       public GenericCatalogDatabase copy() {
-               return new GenericCatalogDatabase(new HashMap<>(properties), 
comment);
+       public String getComment() {
+               return comment;
+       }
+
+       @Override
+       public HiveCatalogDatabase copy() {
+               return new HiveCatalogDatabase(new HashMap<>(properties), 
location, comment);
        }
 
        @Override
@@ -57,15 +81,10 @@ public class GenericCatalogDatabase implements 
CatalogDatabase {
 
        @Override
        public Optional<String> getDetailedDescription() {
-               return Optional.of("This is a generic catalog database stored 
in memory only");
+               return Optional.of("This is a Hive catalog database stored in 
memory only");
        }
 
-       public String getComment() {
-               return this.comment;
+       public String getLocation() {
+               return location;
        }
-
-       public void setComment(String comment) {
-               this.comment = comment;
-       }
-
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java
similarity index 53%
copy from 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java
copy to 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java
index 89233da..e972ba4 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java
@@ -16,38 +16,33 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.catalog;
+package org.apache.flink.table.catalog.hive;
 
-import java.util.Map;
-import java.util.Optional;
+import org.apache.flink.table.catalog.CatalogDatabase;
+
+import org.apache.hadoop.hive.metastore.api.Database;
 
 /**
- * Represents a database object in a catalog.
+ * Utils to convert meta objects between Flink and Hive for HiveCatalog.
  */
-public interface CatalogDatabase {
-       /**
-        * Get a map of properties associated with the database.
-        */
-       Map<String, String> getProperties();
+public class HiveCatalogUtil {
 
-       /**
-        * Get a deep copy of the CatalogDatabase instance.
-        *
-        * @return a copy of CatalogDatabase instance
-        */
-       CatalogDatabase copy();
+       private HiveCatalogUtil() {
+       }
 
-       /**
-        * Get a brief description of the database.
-        *
-        * @return an optional short description of the database
-        */
-       Optional<String> getDescription();
+       // ------ Utils ------
 
        /**
-        * Get a detailed description of the database.
-        *
-        * @return an optional long description of the database
+        * Creates a Hive database from CatalogDatabase.
         */
-       Optional<String> getDetailedDescription();
+       public static Database createHiveDatabase(String dbName, 
CatalogDatabase db) {
+               HiveCatalogDatabase hiveCatalogDatabase = (HiveCatalogDatabase) 
db;
+
+               return new Database(
+                       dbName,
+                       db.getComment(),
+                       hiveCatalogDatabase.getLocation(),
+                       hiveCatalogDatabase.getProperties());
+
+       }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
index 2687699..76f8e08 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
@@ -95,7 +95,7 @@ public class GenericHiveMetastoreCatalogTest extends 
CatalogTestBase {
 
        @Override
        public String getBuiltInDefaultDatabase() {
-               return GenericHiveMetastoreCatalog.DEFAULT_DB;
+               return HiveCatalogBase.DEFAULT_DB;
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
new file mode 100644
index 0000000..7b132c9
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTestBase;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+/**
+ * Test for HiveCatalog.
+ */
+public class HiveCatalogTest extends CatalogTestBase {
+       @BeforeClass
+       public static void init() throws IOException {
+               catalog = HiveTestUtils.createGenericHiveMetastoreCatalog();
+               catalog.open();
+       }
+
+       // =====================
+       // HiveCatalog doesn't support table operation yet
+       // Thus, overriding the following tests which involve table operation 
in CatalogTestBase so they won't run against HiveCatalog
+       // =====================
+
+       // TODO: re-enable these tests once HiveCatalog support table operations
+       @Test
+       public void testDropDb_DatabaseNotEmptyException() throws Exception {
+       }
+
+       @Test
+       public void testCreateTable_Streaming() throws Exception {
+       }
+
+       @Test
+       public void testCreateTable_Batch() throws Exception {
+       }
+
+       @Test
+       public void testCreateTable_DatabaseNotExistException() throws 
Exception {
+       }
+
+       @Test
+       public void testCreateTable_TableAlreadyExistException() throws 
Exception {
+       }
+
+       @Test
+       public void testCreateTable_TableAlreadyExist_ignored() throws 
Exception {
+       }
+
+       @Test
+       public void testGetTable_TableNotExistException() throws Exception {
+       }
+
+       @Test
+       public void testGetTable_TableNotExistException_NoDb() throws Exception 
{
+       }
+
+       @Test
+       public void testDropTable_nonPartitionedTable() throws Exception {
+       }
+
+       @Test
+       public void testDropTable_TableNotExistException() throws Exception {
+       }
+
+       @Test
+       public void testDropTable_TableNotExist_ignored() throws Exception {
+       }
+
+       @Test
+       public void testAlterTable() throws Exception {
+       }
+
+       @Test
+       public void testAlterTable_TableNotExistException() throws Exception {
+       }
+
+       @Test
+       public void testAlterTable_TableNotExist_ignored() throws Exception {
+       }
+
+       @Test
+       public void testRenameTable_nonPartitionedTable() throws Exception {
+       }
+
+       @Test
+       public void testRenameTable_TableNotExistException() throws Exception {
+       }
+
+       @Test
+       public void testRenameTable_TableNotExistException_ignored() throws 
Exception {
+       }
+
+       @Test
+       public void testRenameTable_TableAlreadyExistException() throws 
Exception {
+       }
+
+       @Test
+       public void testTableExists() throws Exception {
+       }
+
+       // ------ utils ------
+
+       @Override
+       public String getBuiltInDefaultDatabase() {
+               return HiveCatalogBase.DEFAULT_DB;
+       }
+
+       @Override
+       public CatalogDatabase createDb() {
+               return new HiveCatalogDatabase(
+                       new HashMap<String, String>() {{
+                               put("k1", "v1");
+                       }},
+                       TEST_COMMENT
+               );
+       }
+
+       @Override
+       public CatalogDatabase createAnotherDb() {
+               return new HiveCatalogDatabase(
+                       new HashMap<String, String>() {{
+                               put("k2", "v2");
+                       }},
+                       TEST_COMMENT
+               );
+       }
+
+       @Override
+       public CatalogTable createTable() {
+               // TODO: implement this once HiveCatalog support table 
operations
+               return null;
+       }
+
+       @Override
+       public CatalogTable createAnotherTable() {
+               // TODO: implement this once HiveCatalog support table 
operations
+               return null;
+       }
+
+       @Override
+       public CatalogTable createStreamingTable() {
+               // TODO: implement this once HiveCatalog support table 
operations
+               return null;
+       }
+
+       @Override
+       public CatalogTable createPartitionedTable() {
+               // TODO: implement this once HiveCatalog support table 
operations
+               return null;
+       }
+
+       @Override
+       public CatalogTable createAnotherPartitionedTable() {
+               // TODO: implement this once HiveCatalog support table 
operations
+               return null;
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
index 959247a..d5df274 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
@@ -29,9 +29,12 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class GenericCatalogDatabase implements CatalogDatabase {
        private final Map<String, String> properties;
-       // Comment of the database
        private String comment = "This is a generic catalog database.";
 
+       public GenericCatalogDatabase() {
+               this.properties = new HashMap<>();
+       }
+
        public GenericCatalogDatabase(Map<String, String> properties) {
                this.properties = checkNotNull(properties, "properties cannot 
be null");
        }
@@ -41,11 +44,17 @@ public class GenericCatalogDatabase implements 
CatalogDatabase {
                this.comment = checkNotNull(comment, "comment cannot be null");
        }
 
+       @Override
        public Map<String, String> getProperties() {
                return properties;
        }
 
        @Override
+       public String getComment() {
+               return this.comment;
+       }
+
+       @Override
        public GenericCatalogDatabase copy() {
                return new GenericCatalogDatabase(new HashMap<>(properties), 
comment);
        }
@@ -59,13 +68,4 @@ public class GenericCatalogDatabase implements 
CatalogDatabase {
        public Optional<String> getDetailedDescription() {
                return Optional.of("This is a generic catalog database stored 
in memory only");
        }
-
-       public String getComment() {
-               return this.comment;
-       }
-
-       public void setComment(String comment) {
-               this.comment = comment;
-       }
-
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java
index 89233da..8b14ff8 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java
@@ -31,6 +31,13 @@ public interface CatalogDatabase {
        Map<String, String> getProperties();
 
        /**
+        * Get comment of the database.
+        *
+        * @return comment of the database
+        */
+       String getComment();
+
+       /**
         * Get a deep copy of the CatalogDatabase instance.
         *
         * @return a copy of CatalogDatabase instance
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index 8117c82..470a9a9 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -109,11 +109,16 @@ public abstract class CatalogTestBase {
        @Test
        public void testSetCurrentDatabase() throws Exception {
                assertEquals(getBuiltInDefaultDatabase(), 
catalog.getCurrentDatabase());
+
                catalog.createDatabase(db2, createDb(), true);
                catalog.setCurrentDatabase(db2);
+
                assertEquals(db2, catalog.getCurrentDatabase());
+
                catalog.setCurrentDatabase(getBuiltInDefaultDatabase());
+
                assertEquals(getBuiltInDefaultDatabase(), 
catalog.getCurrentDatabase());
+
                catalog.dropDatabase(db2, false);
        }
 

Reply via email to