bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282251084
########## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ########## @@ -197,4 +237,193 @@ protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean throw new CatalogException(String.format("Failed to alter database %s", name), e); } } + + // ------ tables ------ + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return createCatalogTable(getHiveTable(tablePath)); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + validateCatalogBaseTable(table); + + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); + } else { + try { + client.createTable(createHiveTable(tablePath, table)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); + } + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + // alter_table() doesn't throw a clear exception when target table doesn't exist. + // Thus, check the table existence explicitly + if (tableExists(tablePath)) { + ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + // alter_table() doesn't throw a clear exception when new table already exists. + // Thus, check the table existence explicitly + if (tableExists(newPath)) { + throw new TableAlreadyExistException(catalogName, newPath); + } else { + Table table = getHiveTable(tablePath); + table.setTableName(newTableName); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename table %s", tablePath.getFullName()), e); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + try { Review comment: In memory catalog also doesn't check that currently. We should solve that in "FLINK-12452: alterTable() in all catalogs should ensure existing base table and the new one are of the same type" ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services