xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282199916
 
 

 ##########
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##########
 @@ -95,144 +106,103 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
        // ------ tables and views------
 
        @Override
-       public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
-                       throws TableNotExistException, CatalogException {
-               try {
-                       client.dropTable(
-                               tablePath.getDatabaseName(),
-                               tablePath.getObjectName(),
-                               // Indicate whether associated data should be 
deleted.
-                               // Set to 'true' for now because Flink tables 
shouldn't have data in Hive. Can be changed later if necessary
-                               true,
-                               ignoreIfNotExists);
-               } catch (NoSuchObjectException e) {
-                       if (!ignoreIfNotExists) {
-                               throw new TableNotExistException(catalogName, 
tablePath);
-                       }
-               } catch (TException e) {
-                       throw new CatalogException(
-                               String.format("Failed to drop table %s", 
tablePath.getFullName()), e);
+       public void validateCatalogBaseTable(CatalogBaseTable table) throws 
IllegalArgumentException {
+               // TODO: invalidate HiveCatalogView
+               if (table instanceof HiveCatalogTable) {
+                       throw new IllegalArgumentException(
+                               "Please use HiveCatalog to operate on 
HiveCatalogTable and HiveCatalogView.");
                }
        }
 
        @Override
-       public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
-                       throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
-               try {
-                       // alter_table() doesn't throw a clear exception when 
target table doesn't exist. Thus, check the table existence explicitly
-                       if (tableExists(tablePath)) {
-                               ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
-                               // alter_table() doesn't throw a clear 
exception when new table already exists. Thus, check the table existence 
explicitly
-                               if (tableExists(newPath)) {
-                                       throw new 
TableAlreadyExistException(catalogName, newPath);
-                               } else {
-                                       Table table = getHiveTable(tablePath);
-                                       table.setTableName(newTableName);
-                                       
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
-                               }
-                       } else if (!ignoreIfNotExists) {
-                               throw new TableNotExistException(catalogName, 
tablePath);
-                       }
-               } catch (TException e) {
-                       throw new CatalogException(
-                               String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
-               }
-       }
+       public CatalogBaseTable createCatalogTable(Table hiveTable) {
+               // Table schema
+               TableSchema tableSchema = createTableSchema(
+                       hiveTable.getSd().getCols(), 
hiveTable.getPartitionKeys());
 
-       @Override
-       public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
-                       throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
-               if (!databaseExists(tablePath.getDatabaseName())) {
-                       throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
-               } else {
-                       try {
-                               
client.createTable(GenericHiveMetastoreCatalogUtil.createHiveTable(tablePath, 
table));
-                       } catch (AlreadyExistsException e) {
-                               if (!ignoreIfExists) {
-                                       throw new 
TableAlreadyExistException(catalogName, tablePath);
-                               }
-                       } catch (TException e) {
-                               throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
-                       }
+               // Table properties
+               Map<String, String> properties = 
retrieveFlinkProperties(hiveTable.getParameters());
+
+               // Table comment
+               String comment = 
properties.remove(HiveTableConfig.TABLE_COMMENT);
+
+               // Partition keys
+               List<String> partitionKeys = new ArrayList<>();
+
+               if (!hiveTable.getPartitionKeys().isEmpty()) {
+                       partitionKeys = hiveTable.getPartitionKeys().stream()
+                               .map(fs -> fs.getName())
+                               .collect(Collectors.toList());
                }
-       }
 
-       @Override
-       public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
-                       throws TableNotExistException, CatalogException {
-               if (!tableExists(tablePath)) {
-                       if (!ignoreIfNotExists) {
-                               throw new TableNotExistException(catalogName, 
tablePath);
-                       }
+               if (TableType.valueOf(hiveTable.getTableType()) == 
TableType.VIRTUAL_VIEW) {
+                       return new GenericCatalogView(
+                               hiveTable.getViewOriginalText(),
+                               hiveTable.getViewExpandedText(),
+                               tableSchema,
+                               properties,
+                               comment
+                       );
                } else {
-                       // IMetastoreClient.alter_table() requires the table to 
have a valid location, which it doesn't in this case
-                       // Thus we have to translate alterTable() into 
(dropTable() + createTable())
-                       dropTable(tablePath, false);
-                       try {
-                               createTable(tablePath, newTable, false);
-                       } catch (TableAlreadyExistException | 
DatabaseNotExistException e) {
-                               // These exceptions wouldn't be thrown, unless 
a concurrent operation is triggered in Hive
-                               throw new CatalogException(
-                                       String.format("Failed to alter table 
%s", tablePath), e);
-                       }
+                       return new GenericCatalogTable(
+                               tableSchema, partitionKeys, properties, 
comment);
                }
        }
 
        @Override
-       public List<String> listTables(String databaseName) throws 
DatabaseNotExistException, CatalogException {
-               try {
-                       return client.getAllTables(databaseName);
-               } catch (UnknownDBException e) {
-                       throw new DatabaseNotExistException(catalogName, 
databaseName);
-               } catch (TException e) {
-                       throw new CatalogException(
-                               String.format("Failed to list tables in 
database %s", databaseName), e);
-               }
-       }
+       public Table createHiveTable(ObjectPath tablePath, CatalogBaseTable 
table) {
+               Map<String, String> properties = new 
HashMap<>(table.getProperties());
 
-       @Override
-       public List<String> listViews(String databaseName) throws 
DatabaseNotExistException, CatalogException {
-               try {
-                       return client.getTables(
-                               databaseName,
-                               null, // table pattern
-                               TableType.VIRTUAL_VIEW);
-               } catch (UnknownDBException e) {
-                       throw new DatabaseNotExistException(catalogName, 
databaseName);
-               } catch (TException e) {
-                       throw new CatalogException(
-                               String.format("Failed to list views in database 
%s", databaseName), e);
-               }
-       }
+               // Table comment
+               properties.put(HiveTableConfig.TABLE_COMMENT, 
table.getComment());
 
-       @Override
-       public CatalogBaseTable getTable(ObjectPath tablePath) throws 
TableNotExistException, CatalogException {
-               Table hiveTable = getHiveTable(tablePath);
+               Table hiveTable = new Table();
+               hiveTable.setDbName(tablePath.getDatabaseName());
+               hiveTable.setTableName(tablePath.getObjectName());
+               hiveTable.setCreateTime((int) (System.currentTimeMillis() / 
1000));
 
-               return 
GenericHiveMetastoreCatalogUtil.createCatalogTable(hiveTable);
-       }
+               // Table properties
+               hiveTable.setParameters(buildFlinkProperties(properties));
 
 Review comment:
   Nit: remove the leading spaces.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to