bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] 
Support table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282251084
 
 

 ##########
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##########
 @@ -197,4 +237,193 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
                        throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
                }
        }
+
+       // ------ tables ------
+
+       @Override
+       public CatalogBaseTable getTable(ObjectPath tablePath)
+                       throws TableNotExistException, CatalogException {
+               return createCatalogTable(getHiveTable(tablePath));
+       }
+
+       @Override
+       public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+                       throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+               validateCatalogBaseTable(table);
+
+               if (!databaseExists(tablePath.getDatabaseName())) {
+                       throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+               } else {
+                       try {
+                               client.createTable(createHiveTable(tablePath, 
table));
+                       } catch (AlreadyExistsException e) {
+                               if (!ignoreIfExists) {
+                                       throw new 
TableAlreadyExistException(catalogName, tablePath);
+                               }
+                       } catch (TException e) {
+                               throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+                       }
+               }
+       }
+
+       @Override
+       public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+                       throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+               try {
+                       // alter_table() doesn't throw a clear exception when 
target table doesn't exist.
+                       // Thus, check the table existence explicitly
+                       if (tableExists(tablePath)) {
+                               ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+                               // alter_table() doesn't throw a clear 
exception when new table already exists.
+                               // Thus, check the table existence explicitly
+                               if (tableExists(newPath)) {
+                                       throw new 
TableAlreadyExistException(catalogName, newPath);
+                               } else {
+                                       Table table = getHiveTable(tablePath);
+                                       table.setTableName(newTableName);
+                                       
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+                               }
+                       } else if (!ignoreIfNotExists) {
+                               throw new TableNotExistException(catalogName, 
tablePath);
+                       }
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+               }
+       }
+
+       @Override
+       public void alterTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, boolean ignoreIfNotExists)
+                       throws TableNotExistException, CatalogException {
+               try {
 
 Review comment:
   In memory catalog also doesn't check that currently. We should solve that in 
"FLINK-12452: alterTable() in all catalogs should ensure existing base table 
and the new one are of the same type"

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to