xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282322682
########## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ########## @@ -197,4 +237,196 @@ protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean throw new CatalogException(String.format("Failed to alter database %s", name), e); } } + + // ------ tables ------ + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return createCatalogBaseTable(getHiveTable(tablePath)); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + validateCatalogBaseTable(table); + + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); + } else { + try { + client.createTable(createHiveTable(tablePath, table)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); + } + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + // alter_table() doesn't throw a clear exception when target table doesn't exist. + // Thus, check the table existence explicitly + if (tableExists(tablePath)) { + ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + // alter_table() doesn't throw a clear exception when new table already exists. + // Thus, check the table existence explicitly + if (tableExists(newPath)) { + throw new TableAlreadyExistException(catalogName, newPath); + } else { + Table table = getHiveTable(tablePath); + table.setTableName(newTableName); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename table %s", tablePath.getFullName()), e); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + validateCatalogBaseTable(newCatalogTable); + + try { + if (!tableExists(tablePath)) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } else { + // TODO: [FLINK-12452] alterTable() in all catalogs should ensure existing base table and the new one are of the same type + Table newTable = createHiveTable(tablePath, newCatalogTable); + + // client.alter_table() requires a valid location + // thus, if new table doesn't have that, it reuses location of the old table + if (!newTable.getSd().isSetLocation()) { + Table oldTable = getHiveTable(tablePath); + newTable.getSd().setLocation(oldTable.getSd().getLocation()); + } + + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), newTable); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename table %s", tablePath.getFullName()), e); + } + } + + @Override + public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + try { + client.dropTable( + tablePath.getDatabaseName(), + tablePath.getObjectName(), + // Indicate whether associated data should be deleted. + // Set to 'true' for now because Flink tables shouldn't have data in Hive. Can be changed later if necessary + true, + ignoreIfNotExists); + } catch (NoSuchObjectException e) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to drop table %s", tablePath.getFullName()), e); + } + } + + @Override + public List<String> listTables(String databaseName) + throws DatabaseNotExistException, CatalogException { + try { + return client.getAllTables(databaseName); + } catch (UnknownDBException e) { + throw new DatabaseNotExistException(catalogName, databaseName); + } catch (TException e) { + throw new CatalogException( + String.format("Failed to list tables in database %s", databaseName), e); + } + } + + @Override + public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException { + try { + return client.getTables( + databaseName, + null, // table pattern + TableType.VIRTUAL_VIEW); + } catch (UnknownDBException e) { + throw new DatabaseNotExistException(catalogName, databaseName); + } catch (TException e) { + throw new CatalogException( + String.format("Failed to list views in database %s", databaseName), e); + } + } + + @Override + public boolean tableExists(ObjectPath tablePath) throws CatalogException { + try { + return client.tableExists(tablePath.getDatabaseName(), tablePath.getObjectName()); + } catch (UnknownDBException e) { + return false; + } catch (TException e) { + throw new CatalogException( + String.format("Failed to check whether table %s exists or not.", tablePath.getFullName()), e); + } + } + + private Table getHiveTable(ObjectPath tablePath) throws TableNotExistException { + try { + return client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName()); + } catch (NoSuchObjectException e) { + throw new TableNotExistException(catalogName, tablePath); + } catch (TException e) { + throw new CatalogException( + String.format("Failed to get table %s from Hive metastore", tablePath.getFullName()), e); + } + } + + /** + * Create a Flink's TableSchema from Hive table's columns and partition keys. + */ + protected TableSchema createTableSchema(List<FieldSchema> cols, List<FieldSchema> partitionKeys) { + List<FieldSchema> allCols = new ArrayList<>(cols); + allCols.addAll(partitionKeys); + + String[] colNames = new String[allCols.size()]; + TypeInformation[] colTypes = new TypeInformation[allCols.size()]; + + for (int i = 0; i < allCols.size(); i++) { + FieldSchema fs = allCols.get(i); + + colNames[i] = fs.getName(); + colTypes[i] = HiveTypeUtil.toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(fs.getType())); + } + + return new TableSchema(colNames, colTypes); + } + + /** + * Create Hive columns from Flink TableSchema. + */ + protected List<FieldSchema> createHiveColumns(TableSchema schema) { Review comment: static? In fact, this one (and the one below) is a good candidate in a util class that can be shared by the two subclasses. It's a little odd that it is defined in the base class but is never used there. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services