[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-02 Thread GitBox
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] 
Support table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280604389
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -229,31 +231,97 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   try {
+   client.dropTable(
+   tablePath.getDatabaseName(),
+   tablePath.getObjectName(),
+   // Indicate whether associated data should be 
deleted.
+   // Set to 'true' for now because Flink tables 
shouldn't have data in Hive. Can be changed later if necessary
 
 Review comment:
   yeah, later when necessary. Along with 'purge'


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-02 Thread GitBox
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] 
Support table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280606701
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
 ##
 @@ -41,28 +41,28 @@
private final List partitionKeys;
// Properties of the table
private final Map properties;
-   // Comment of the table
-   private String comment = "This is a generic catalog table.";
+   // Description of the table
+   private String description = "This is a generic catalog table.";
 
 Review comment:
   reverted. https://issues.apache.org/jira/browse/FLINK-12395


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-02 Thread GitBox
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] 
Support table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280606701
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
 ##
 @@ -41,28 +41,28 @@
private final List partitionKeys;
// Properties of the table
private final Map properties;
-   // Comment of the table
-   private String comment = "This is a generic catalog table.";
+   // Description of the table
+   private String description = "This is a generic catalog table.";
 
 Review comment:
   reverted https://issues.apache.org/jira/browse/FLINK-12395


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-02 Thread GitBox
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] 
Support table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280604389
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -229,31 +231,97 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   try {
+   client.dropTable(
+   tablePath.getDatabaseName(),
+   tablePath.getObjectName(),
+   // Indicate whether associated data should be 
deleted.
+   // Set to 'true' for now because Flink tables 
shouldn't have data in Hive. Can be changed later if necessary
 
 Review comment:
   agree, along with 'purge'


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread GitBox
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] 
Support table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280257423
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
 ##
 @@ -18,32 +18,179 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.plan.stats.TableStats;
 
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-
+import java.util.stream.Collectors;
 
 /**
  * Utils to convert meta objects between Flink and Hive for 
GenericHiveMetastoreCatalog.
  */
 public class GenericHiveMetastoreCatalogUtil {
 
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+
+   // Flink tables should be stored as 'external' tables in Hive metastore
+   private static final Map EXTERNAL_TABLE_PROPERTY = new 
HashMap() {{
+   put("EXTERNAL", "TRUE");
+   }};
+
private GenericHiveMetastoreCatalogUtil() {
}
 
// -- Utils --
 
/**
-* Creates a Hive database from CatalogDatabase.
+* Creates a Hive database from a CatalogDatabase.
+*
+* @param databaseName name of the database
+* @param catalogDatabase the CatalogDatabase instance
+* @return a Hive database
 */
-   public static Database createHiveDatabase(String dbName, 
CatalogDatabase db) {
-   Map props = db.getProperties();
+   public static Database createHiveDatabase(String databaseName, 
CatalogDatabase catalogDatabase) {
return new Database(
-   dbName,
-   db.getDescription().isPresent() ? 
db.getDescription().get() : null,
+   databaseName,
+   catalogDatabase.getDescription().isPresent() ? 
catalogDatabase.getDescription().get() : null,
null,
-   props);
+   catalogDatabase.getProperties());
+   }
+
+   /**
+* Creates a Hive table from a CatalogBaseTable.
+*
+* @param tablePath path of the table
+* @param table the CatalogBaseTable instance
+* @return a Hive table
+*/
+   public static Table createHiveTable(ObjectPath tablePath, 
CatalogBaseTable table) {
+   Map properties = new 
HashMap<>(table.getProperties());
+
+   // Table description
+   if (table.getDescription().isPresent()) {
+   properties.put(HiveTableConfig.TABLE_DESCRITPION, 
table.getDescription().get());
+   }
+
+   Table hiveTable = new Table();
+   hiveTable.setDbName(tablePath.getDatabaseName());
+   hiveTable.setTableName(tablePath.getObjectName());
+   hiveTable.setCreateTime((int) (System.currentTimeMillis() / 
1000));
+
+   // Table properties
+   hiveTable.setParameters(buildFlinkProperties(properties));
+   hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY);
+
+   // Hive table's StorageDescriptor
+   StorageDescriptor sd = new StorageDescriptor();
+   sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
+
+   List allColumns = 
createHiveColumns(table.getSchema());
+
+   // Table columns and partition keys
+   CatalogTable catalogTable = (CatalogTable) table;
+
+   if (catalogTable.isPartitioned()) {
+   int partitionKeySize = 
catalogTable.getPartitionKeys().size();
+   List regularColumns = 
allColumns.subList(0, allColumns.size() - partitionKeySize);
+   List partitionColumns = 
allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
+
+   sd.setCols(regularColumns);
+   hiveTable.setPartitionKeys(part

[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread GitBox
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] 
Support table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280287299
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
 ##
 @@ -18,32 +18,179 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.plan.stats.TableStats;
 
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-
+import java.util.stream.Collectors;
 
 /**
  * Utils to convert meta objects between Flink and Hive for 
GenericHiveMetastoreCatalog.
  */
 public class GenericHiveMetastoreCatalogUtil {
 
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+
+   // Flink tables should be stored as 'external' tables in Hive metastore
+   private static final Map EXTERNAL_TABLE_PROPERTY = new 
HashMap() {{
+   put("EXTERNAL", "TRUE");
+   }};
+
private GenericHiveMetastoreCatalogUtil() {
}
 
// -- Utils --
 
/**
-* Creates a Hive database from CatalogDatabase.
+* Creates a Hive database from a CatalogDatabase.
+*
+* @param databaseName name of the database
+* @param catalogDatabase the CatalogDatabase instance
+* @return a Hive database
 */
-   public static Database createHiveDatabase(String dbName, 
CatalogDatabase db) {
-   Map props = db.getProperties();
+   public static Database createHiveDatabase(String databaseName, 
CatalogDatabase catalogDatabase) {
return new Database(
-   dbName,
-   db.getDescription().isPresent() ? 
db.getDescription().get() : null,
+   databaseName,
+   catalogDatabase.getDescription().isPresent() ? 
catalogDatabase.getDescription().get() : null,
null,
-   props);
+   catalogDatabase.getProperties());
+   }
+
+   /**
+* Creates a Hive table from a CatalogBaseTable.
+*
+* @param tablePath path of the table
+* @param table the CatalogBaseTable instance
+* @return a Hive table
+*/
+   public static Table createHiveTable(ObjectPath tablePath, 
CatalogBaseTable table) {
+   Map properties = new 
HashMap<>(table.getProperties());
+
+   // Table description
+   if (table.getDescription().isPresent()) {
+   properties.put(HiveTableConfig.TABLE_DESCRITPION, 
table.getDescription().get());
+   }
+
+   Table hiveTable = new Table();
+   hiveTable.setDbName(tablePath.getDatabaseName());
+   hiveTable.setTableName(tablePath.getObjectName());
+   hiveTable.setCreateTime((int) (System.currentTimeMillis() / 
1000));
+
+   // Table properties
+   hiveTable.setParameters(buildFlinkProperties(properties));
+   hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY);
+
+   // Hive table's StorageDescriptor
+   StorageDescriptor sd = new StorageDescriptor();
+   sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
+
+   List allColumns = 
createHiveColumns(table.getSchema());
+
+   // Table columns and partition keys
+   CatalogTable catalogTable = (CatalogTable) table;
+
+   if (catalogTable.isPartitioned()) {
+   int partitionKeySize = 
catalogTable.getPartitionKeys().size();
+   List regularColumns = 
allColumns.subList(0, allColumns.size() - partitionKeySize);
+   List partitionColumns = 
allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
+
+   sd.setCols(regularColumns);
+   hiveTable.setPartitionKeys(part

[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread GitBox
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] 
Support table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280238806
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -229,31 +231,91 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   try {
+   client.dropTable(tablePath.getDatabaseName(), 
tablePath.getObjectName(), true, ignoreIfNotExists);
 
 Review comment:
   sure


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread GitBox
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] 
Support table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280257423
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
 ##
 @@ -18,32 +18,179 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.plan.stats.TableStats;
 
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-
+import java.util.stream.Collectors;
 
 /**
  * Utils to convert meta objects between Flink and Hive for 
GenericHiveMetastoreCatalog.
  */
 public class GenericHiveMetastoreCatalogUtil {
 
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+
+   // Flink tables should be stored as 'external' tables in Hive metastore
+   private static final Map EXTERNAL_TABLE_PROPERTY = new 
HashMap() {{
+   put("EXTERNAL", "TRUE");
+   }};
+
private GenericHiveMetastoreCatalogUtil() {
}
 
// -- Utils --
 
/**
-* Creates a Hive database from CatalogDatabase.
+* Creates a Hive database from a CatalogDatabase.
+*
+* @param databaseName name of the database
+* @param catalogDatabase the CatalogDatabase instance
+* @return a Hive database
 */
-   public static Database createHiveDatabase(String dbName, 
CatalogDatabase db) {
-   Map props = db.getProperties();
+   public static Database createHiveDatabase(String databaseName, 
CatalogDatabase catalogDatabase) {
return new Database(
-   dbName,
-   db.getDescription().isPresent() ? 
db.getDescription().get() : null,
+   databaseName,
+   catalogDatabase.getDescription().isPresent() ? 
catalogDatabase.getDescription().get() : null,
null,
-   props);
+   catalogDatabase.getProperties());
+   }
+
+   /**
+* Creates a Hive table from a CatalogBaseTable.
+*
+* @param tablePath path of the table
+* @param table the CatalogBaseTable instance
+* @return a Hive table
+*/
+   public static Table createHiveTable(ObjectPath tablePath, 
CatalogBaseTable table) {
+   Map properties = new 
HashMap<>(table.getProperties());
+
+   // Table description
+   if (table.getDescription().isPresent()) {
+   properties.put(HiveTableConfig.TABLE_DESCRITPION, 
table.getDescription().get());
+   }
+
+   Table hiveTable = new Table();
+   hiveTable.setDbName(tablePath.getDatabaseName());
+   hiveTable.setTableName(tablePath.getObjectName());
+   hiveTable.setCreateTime((int) (System.currentTimeMillis() / 
1000));
+
+   // Table properties
+   hiveTable.setParameters(buildFlinkProperties(properties));
+   hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY);
+
+   // Hive table's StorageDescriptor
+   StorageDescriptor sd = new StorageDescriptor();
+   sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
+
+   List allColumns = 
createHiveColumns(table.getSchema());
+
+   // Table columns and partition keys
+   CatalogTable catalogTable = (CatalogTable) table;
+
+   if (catalogTable.isPartitioned()) {
+   int partitionKeySize = 
catalogTable.getPartitionKeys().size();
+   List regularColumns = 
allColumns.subList(0, allColumns.size() - partitionKeySize);
+   List partitionColumns = 
allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
+
+   sd.setCols(regularColumns);
+   hiveTable.setPartitionKeys(part

[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread GitBox
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] 
Support table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280257096
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
 ##
 @@ -18,32 +18,179 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.plan.stats.TableStats;
 
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-
+import java.util.stream.Collectors;
 
 /**
  * Utils to convert meta objects between Flink and Hive for 
GenericHiveMetastoreCatalog.
  */
 public class GenericHiveMetastoreCatalogUtil {
 
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+
+   // Flink tables should be stored as 'external' tables in Hive metastore
+   private static final Map EXTERNAL_TABLE_PROPERTY = new 
HashMap() {{
+   put("EXTERNAL", "TRUE");
+   }};
+
private GenericHiveMetastoreCatalogUtil() {
}
 
// -- Utils --
 
/**
-* Creates a Hive database from CatalogDatabase.
+* Creates a Hive database from a CatalogDatabase.
+*
+* @param databaseName name of the database
+* @param catalogDatabase the CatalogDatabase instance
+* @return a Hive database
 */
-   public static Database createHiveDatabase(String dbName, 
CatalogDatabase db) {
-   Map props = db.getProperties();
+   public static Database createHiveDatabase(String databaseName, 
CatalogDatabase catalogDatabase) {
return new Database(
-   dbName,
-   db.getDescription().isPresent() ? 
db.getDescription().get() : null,
+   databaseName,
+   catalogDatabase.getDescription().isPresent() ? 
catalogDatabase.getDescription().get() : null,
null,
-   props);
+   catalogDatabase.getProperties());
+   }
+
+   /**
+* Creates a Hive table from a CatalogBaseTable.
+*
+* @param tablePath path of the table
+* @param table the CatalogBaseTable instance
+* @return a Hive table
+*/
+   public static Table createHiveTable(ObjectPath tablePath, 
CatalogBaseTable table) {
+   Map properties = new 
HashMap<>(table.getProperties());
+
+   // Table description
+   if (table.getDescription().isPresent()) {
+   properties.put(HiveTableConfig.TABLE_DESCRITPION, 
table.getDescription().get());
+   }
+
+   Table hiveTable = new Table();
+   hiveTable.setDbName(tablePath.getDatabaseName());
+   hiveTable.setTableName(tablePath.getObjectName());
+   hiveTable.setCreateTime((int) (System.currentTimeMillis() / 
1000));
+
+   // Table properties
+   hiveTable.setParameters(buildFlinkProperties(properties));
+   hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY);
+
+   // Hive table's StorageDescriptor
+   StorageDescriptor sd = new StorageDescriptor();
+   sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
+
+   List allColumns = 
createHiveColumns(table.getSchema());
+
+   // Table columns and partition keys
+   CatalogTable catalogTable = (CatalogTable) table;
+
+   if (catalogTable.isPartitioned()) {
+   int partitionKeySize = 
catalogTable.getPartitionKeys().size();
+   List regularColumns = 
allColumns.subList(0, allColumns.size() - partitionKeySize);
+   List partitionColumns = 
allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
+
+   sd.setCols(regularColumns);
+   hiveTable.setPartitionKeys(part

[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread GitBox
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] 
Support table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280256225
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
 ##
 @@ -41,28 +41,28 @@
private final List partitionKeys;
// Properties of the table
private final Map properties;
-   // Comment of the table
-   private String comment = "This is a generic catalog table.";
+   // Description of the table
+   private String description = "This is a generic catalog table.";
 
 Review comment:
   I agree. To add on, I also doubt the necessity to have a predefined 
description and a predefined detailed description.
   
   Nevertheless, I think we'd better leave them to a different PR. I'll revert 
this part to make this PR focus on what it intends to accomplish


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread GitBox
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] 
Support table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280238806
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -229,31 +231,91 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   try {
+   client.dropTable(tablePath.getDatabaseName(), 
tablePath.getObjectName(), true, ignoreIfNotExists);
 
 Review comment:
   sure


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services