xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282322682
 
 

 ##########
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##########
 @@ -197,4 +237,196 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
                        throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
                }
        }
+
+       // ------ tables ------
+
+       @Override
+       public CatalogBaseTable getTable(ObjectPath tablePath)
+                       throws TableNotExistException, CatalogException {
+               return createCatalogBaseTable(getHiveTable(tablePath));
+       }
+
+       @Override
+       public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+                       throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+               validateCatalogBaseTable(table);
+
+               if (!databaseExists(tablePath.getDatabaseName())) {
+                       throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+               } else {
+                       try {
+                               client.createTable(createHiveTable(tablePath, 
table));
+                       } catch (AlreadyExistsException e) {
+                               if (!ignoreIfExists) {
+                                       throw new 
TableAlreadyExistException(catalogName, tablePath);
+                               }
+                       } catch (TException e) {
+                               throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+                       }
+               }
+       }
+
+       @Override
+       public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+                       throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+               try {
+                       // alter_table() doesn't throw a clear exception when 
target table doesn't exist.
+                       // Thus, check the table existence explicitly
+                       if (tableExists(tablePath)) {
+                               ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+                               // alter_table() doesn't throw a clear 
exception when new table already exists.
+                               // Thus, check the table existence explicitly
+                               if (tableExists(newPath)) {
+                                       throw new 
TableAlreadyExistException(catalogName, newPath);
+                               } else {
+                                       Table table = getHiveTable(tablePath);
+                                       table.setTableName(newTableName);
+                                       
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+                               }
+                       } else if (!ignoreIfNotExists) {
+                               throw new TableNotExistException(catalogName, 
tablePath);
+                       }
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+               }
+       }
+
+       @Override
+       public void alterTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, boolean ignoreIfNotExists)
+                       throws TableNotExistException, CatalogException {
+               validateCatalogBaseTable(newCatalogTable);
+
+               try {
+                       if (!tableExists(tablePath)) {
+                               if (!ignoreIfNotExists) {
+                                       throw new 
TableNotExistException(catalogName, tablePath);
+                               }
+                       } else {
+                               // TODO: [FLINK-12452] alterTable() in all 
catalogs should ensure existing base table and the new one are of the same type
+                               Table newTable = createHiveTable(tablePath, 
newCatalogTable);
+
+                               // client.alter_table() requires a valid 
location
+                               // thus, if new table doesn't have that, it 
reuses location of the old table
+                               if (!newTable.getSd().isSetLocation()) {
+                                       Table oldTable = 
getHiveTable(tablePath);
+                                       
newTable.getSd().setLocation(oldTable.getSd().getLocation());
+                               }
+
+                               client.alter_table(tablePath.getDatabaseName(), 
tablePath.getObjectName(), newTable);
+                       }
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+               }
+       }
+
+       @Override
+       public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+                       throws TableNotExistException, CatalogException {
+               try {
+                       client.dropTable(
+                               tablePath.getDatabaseName(),
+                               tablePath.getObjectName(),
+                               // Indicate whether associated data should be 
deleted.
+                               // Set to 'true' for now because Flink tables 
shouldn't have data in Hive. Can be changed later if necessary
+                               true,
+                               ignoreIfNotExists);
+               } catch (NoSuchObjectException e) {
+                       if (!ignoreIfNotExists) {
+                               throw new TableNotExistException(catalogName, 
tablePath);
+                       }
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to drop table %s", 
tablePath.getFullName()), e);
+               }
+       }
+
+       @Override
+       public List<String> listTables(String databaseName)
+                       throws DatabaseNotExistException, CatalogException {
+               try {
+                       return client.getAllTables(databaseName);
+               } catch (UnknownDBException e) {
+                       throw new DatabaseNotExistException(catalogName, 
databaseName);
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to list tables in 
database %s", databaseName), e);
+               }
+       }
+
+       @Override
+       public List<String> listViews(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+               try {
+                       return client.getTables(
+                               databaseName,
+                               null, // table pattern
+                               TableType.VIRTUAL_VIEW);
+               } catch (UnknownDBException e) {
+                       throw new DatabaseNotExistException(catalogName, 
databaseName);
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to list views in database 
%s", databaseName), e);
+               }
+       }
+
+       @Override
+       public boolean tableExists(ObjectPath tablePath) throws 
CatalogException {
+               try {
+                       return client.tableExists(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+               } catch (UnknownDBException e) {
+                       return false;
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to check whether table %s 
exists or not.", tablePath.getFullName()), e);
+               }
+       }
+
+       private Table getHiveTable(ObjectPath tablePath) throws 
TableNotExistException {
+               try {
+                       return client.getTable(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+               } catch (NoSuchObjectException e) {
+                       throw new TableNotExistException(catalogName, 
tablePath);
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to get table %s from Hive 
metastore", tablePath.getFullName()), e);
+               }
+       }
+
+       /**
+        * Create a Flink's TableSchema from Hive table's columns and partition 
keys.
+        */
+       protected TableSchema createTableSchema(List<FieldSchema> cols, 
List<FieldSchema> partitionKeys) {
+               List<FieldSchema> allCols = new ArrayList<>(cols);
+               allCols.addAll(partitionKeys);
+
+               String[] colNames = new String[allCols.size()];
+               TypeInformation[] colTypes = new 
TypeInformation[allCols.size()];
+
+               for (int i = 0; i < allCols.size(); i++) {
+                       FieldSchema fs = allCols.get(i);
+
+                       colNames[i] = fs.getName();
+                       colTypes[i] = 
HiveTypeUtil.toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(fs.getType()));
+               }
+
+               return new TableSchema(colNames, colTypes);
+       }
+
+       /**
+        * Create Hive columns from Flink TableSchema.
+        */
+       protected List<FieldSchema> createHiveColumns(TableSchema schema) {
 
 Review comment:
   static? In fact, this one (and the one below) is a good candidate in a util 
class that can be shared by the two subclasses. It's a little odd that it is 
defined in the base class but is never used there.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to