xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282199916
########## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ########## @@ -95,144 +106,103 @@ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean igno // ------ tables and views------ @Override - public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) - throws TableNotExistException, CatalogException { - try { - client.dropTable( - tablePath.getDatabaseName(), - tablePath.getObjectName(), - // Indicate whether associated data should be deleted. - // Set to 'true' for now because Flink tables shouldn't have data in Hive. Can be changed later if necessary - true, - ignoreIfNotExists); - } catch (NoSuchObjectException e) { - if (!ignoreIfNotExists) { - throw new TableNotExistException(catalogName, tablePath); - } - } catch (TException e) { - throw new CatalogException( - String.format("Failed to drop table %s", tablePath.getFullName()), e); + public void validateCatalogBaseTable(CatalogBaseTable table) throws IllegalArgumentException { + // TODO: invalidate HiveCatalogView + if (table instanceof HiveCatalogTable) { + throw new IllegalArgumentException( + "Please use HiveCatalog to operate on HiveCatalogTable and HiveCatalogView."); } } @Override - public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) - throws TableNotExistException, TableAlreadyExistException, CatalogException { - try { - // alter_table() doesn't throw a clear exception when target table doesn't exist. Thus, check the table existence explicitly - if (tableExists(tablePath)) { - ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); - // alter_table() doesn't throw a clear exception when new table already exists. Thus, check the table existence explicitly - if (tableExists(newPath)) { - throw new TableAlreadyExistException(catalogName, newPath); - } else { - Table table = getHiveTable(tablePath); - table.setTableName(newTableName); - client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); - } - } else if (!ignoreIfNotExists) { - throw new TableNotExistException(catalogName, tablePath); - } - } catch (TException e) { - throw new CatalogException( - String.format("Failed to rename table %s", tablePath.getFullName()), e); - } - } + public CatalogBaseTable createCatalogTable(Table hiveTable) { + // Table schema + TableSchema tableSchema = createTableSchema( + hiveTable.getSd().getCols(), hiveTable.getPartitionKeys()); - @Override - public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) - throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { - if (!databaseExists(tablePath.getDatabaseName())) { - throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); - } else { - try { - client.createTable(GenericHiveMetastoreCatalogUtil.createHiveTable(tablePath, table)); - } catch (AlreadyExistsException e) { - if (!ignoreIfExists) { - throw new TableAlreadyExistException(catalogName, tablePath); - } - } catch (TException e) { - throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); - } + // Table properties + Map<String, String> properties = retrieveFlinkProperties(hiveTable.getParameters()); + + // Table comment + String comment = properties.remove(HiveTableConfig.TABLE_COMMENT); + + // Partition keys + List<String> partitionKeys = new ArrayList<>(); + + if (!hiveTable.getPartitionKeys().isEmpty()) { + partitionKeys = hiveTable.getPartitionKeys().stream() + .map(fs -> fs.getName()) + .collect(Collectors.toList()); } - } - @Override - public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) - throws TableNotExistException, CatalogException { - if (!tableExists(tablePath)) { - if (!ignoreIfNotExists) { - throw new TableNotExistException(catalogName, tablePath); - } + if (TableType.valueOf(hiveTable.getTableType()) == TableType.VIRTUAL_VIEW) { + return new GenericCatalogView( + hiveTable.getViewOriginalText(), + hiveTable.getViewExpandedText(), + tableSchema, + properties, + comment + ); } else { - // IMetastoreClient.alter_table() requires the table to have a valid location, which it doesn't in this case - // Thus we have to translate alterTable() into (dropTable() + createTable()) - dropTable(tablePath, false); - try { - createTable(tablePath, newTable, false); - } catch (TableAlreadyExistException | DatabaseNotExistException e) { - // These exceptions wouldn't be thrown, unless a concurrent operation is triggered in Hive - throw new CatalogException( - String.format("Failed to alter table %s", tablePath), e); - } + return new GenericCatalogTable( + tableSchema, partitionKeys, properties, comment); } } @Override - public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException { - try { - return client.getAllTables(databaseName); - } catch (UnknownDBException e) { - throw new DatabaseNotExistException(catalogName, databaseName); - } catch (TException e) { - throw new CatalogException( - String.format("Failed to list tables in database %s", databaseName), e); - } - } + public Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) { + Map<String, String> properties = new HashMap<>(table.getProperties()); - @Override - public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException { - try { - return client.getTables( - databaseName, - null, // table pattern - TableType.VIRTUAL_VIEW); - } catch (UnknownDBException e) { - throw new DatabaseNotExistException(catalogName, databaseName); - } catch (TException e) { - throw new CatalogException( - String.format("Failed to list views in database %s", databaseName), e); - } - } + // Table comment + properties.put(HiveTableConfig.TABLE_COMMENT, table.getComment()); - @Override - public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { - Table hiveTable = getHiveTable(tablePath); + Table hiveTable = new Table(); + hiveTable.setDbName(tablePath.getDatabaseName()); + hiveTable.setTableName(tablePath.getObjectName()); + hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000)); - return GenericHiveMetastoreCatalogUtil.createCatalogTable(hiveTable); - } + // Table properties + hiveTable.setParameters(buildFlinkProperties(properties)); Review comment: Nit: remove the leading spaces. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services