This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cf20197  [FLINK-12239][hive] Support table related operations in 
GenericHiveMetastoreCatalog
cf20197 is described below

commit cf20197f1c8c51eab028da7c477dd6c7ad96db2f
Author: bowen.li <bowenl...@gmail.com>
AuthorDate: Wed May 1 14:26:35 2019 -0700

    [FLINK-12239][hive] Support table related operations in 
GenericHiveMetastoreCatalog
    
    This PR enables GenericHiveMetastoreCatalog to operate Flink tables by 
using Hive metastore as a storage. Flink tables will be stored as Hive tables 
in metastore, and GenericHiveMetastoreCatalog can convert between Flink and 
Hive tables upon read and write.
    
    This closes #8329
---
 flink-connectors/flink-connector-hive/pom.xml      |   5 +-
 .../catalog/hive/GenericHiveMetastoreCatalog.java  | 114 +++++++-
 .../hive/GenericHiveMetastoreCatalogUtil.java      | 160 ++++++++++-
 .../table/catalog/hive/HiveCatalogBaseUtil.java    |  58 ++++
 .../flink/table/catalog/hive/HiveTableConfig.java  |  16 +-
 .../flink/table/catalog/hive/HiveTypeUtil.java     | 140 ++++++++++
 .../src/main/resources/META-INF/NOTICE             |   3 +
 .../hive/GenericHiveMetastoreCatalogTest.java      |  96 ++++++-
 .../flink/table/catalog/hive/HiveTestUtils.java    |   4 +-
 .../flink/table/catalog/GenericCatalogTable.java   |  17 +-
 .../table/catalog/GenericInMemoryCatalogTest.java  | 298 ++-------------------
 .../flink/table/catalog/CatalogBaseTable.java      |  14 +-
 .../flink/table/catalog/ReadableCatalog.java       |  12 +-
 .../table/catalog/ReadableWritableCatalog.java     |   7 +-
 .../exceptions/DatabaseNotEmptyException.java      |   2 +-
 .../flink/table/catalog/CatalogTestBase.java       | 297 +++++++++++++++++++-
 .../flink/table/catalog/CatalogTestUtil.java       |  10 +-
 17 files changed, 900 insertions(+), 353 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/pom.xml 
b/flink-connectors/flink-connector-hive/pom.xml
index cb09934..1b672d4 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -197,10 +197,6 @@ under the License.
                                        <artifactId>metrics-json</artifactId>
                                </exclusion>
                                <exclusion>
-                                       
<groupId>com.fasterxml.jackson.core</groupId>
-                                       
<artifactId>jackson-databind</artifactId>
-                               </exclusion>
-                               <exclusion>
                                        <groupId>com.github.joshelser</groupId>
                                        
<artifactId>dropwizard-metrics-hadoop-metrics2-reporter</artifactId>
                                </exclusion>
@@ -387,6 +383,7 @@ under the License.
                                                                        
<include>commons-dbcp:commons-dbcp</include>
                                                                        
<include>commons-pool:commons-pool</include>
                                                                        
<include>commons-beanutils:commons-beanutils</include>
+                                                                       
<include>com.fasterxml.jackson.core:*</include>
                                                                        
<include>com.jolbox:bonecp</include>
                                                                        
<include>org.apache.hive:*</include>
                                                                        
<include>org.apache.thrift:libthrift</include>
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
index a8fcf62..50ed2e9 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
@@ -49,6 +49,8 @@ import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -229,31 +231,97 @@ public class GenericHiveMetastoreCatalog implements 
ReadableWritableCatalog {
        @Override
        public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
                        throws TableNotExistException, CatalogException {
-               throw new UnsupportedOperationException();
+               try {
+                       client.dropTable(
+                               tablePath.getDatabaseName(),
+                               tablePath.getObjectName(),
+                               // Indicate whether associated data should be 
deleted.
+                               // Set to 'true' for now because Flink tables 
shouldn't have data in Hive. Can be changed later if necessary
+                               true,
+                               ignoreIfNotExists);
+               } catch (NoSuchObjectException e) {
+                       if (!ignoreIfNotExists) {
+                               throw new TableNotExistException(catalogName, 
tablePath);
+                       }
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to drop table %s", 
tablePath.getFullName()), e);
+               }
        }
 
        @Override
        public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
-                       throws TableNotExistException, 
TableAlreadyExistException, DatabaseNotExistException, CatalogException {
-               throw new UnsupportedOperationException();
+                       throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+               try {
+                       // alter_table() doesn't throw a clear exception when 
target table doesn't exist. Thus, check the table existence explicitly
+                       if (tableExists(tablePath)) {
+                               ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+                               // alter_table() doesn't throw a clear 
exception when new table already exists. Thus, check the table existence 
explicitly
+                               if (tableExists(newPath)) {
+                                       throw new 
TableAlreadyExistException(catalogName, newPath);
+                               } else {
+                                       Table table = getHiveTable(tablePath);
+                                       table.setTableName(newTableName);
+                                       
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+                               }
+                       } else if (!ignoreIfNotExists) {
+                               throw new TableNotExistException(catalogName, 
tablePath);
+                       }
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+               }
        }
 
        @Override
        public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
                        throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
-               throw new UnsupportedOperationException();
+               if (!databaseExists(tablePath.getDatabaseName())) {
+                       throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+               } else {
+                       try {
+                               
client.createTable(GenericHiveMetastoreCatalogUtil.createHiveTable(tablePath, 
table));
+                       } catch (AlreadyExistsException e) {
+                               if (!ignoreIfExists) {
+                                       throw new 
TableAlreadyExistException(catalogName, tablePath);
+                               }
+                       } catch (TException e) {
+                               throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+                       }
+               }
        }
 
        @Override
-       public void alterTable(ObjectPath tableName, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
+       public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
                        throws TableNotExistException, CatalogException {
-               throw new UnsupportedOperationException();
+               if (!tableExists(tablePath)) {
+                       if (!ignoreIfNotExists) {
+                               throw new TableNotExistException(catalogName, 
tablePath);
+                       }
+               } else {
+                       // IMetastoreClient.alter_table() requires the table to 
have a valid location, which it doesn't in this case
+                       // Thus we have to translate alterTable() into 
(dropTable() + createTable())
+                       dropTable(tablePath, false);
+                       try {
+                               createTable(tablePath, newTable, false);
+                       } catch (TableAlreadyExistException | 
DatabaseNotExistException e) {
+                               // These exceptions wouldn't be thrown, unless 
a concurrent operation is triggered in Hive
+                               throw new CatalogException(
+                                       String.format("Failed to alter table 
%s", tablePath), e);
+                       }
+               }
        }
 
        @Override
-       public List<String> listTables(String databaseName)
-                       throws DatabaseNotExistException, CatalogException {
-               throw new UnsupportedOperationException();
+       public List<String> listTables(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+               try {
+                       return client.getAllTables(databaseName);
+               } catch (UnknownDBException e) {
+                       throw new DatabaseNotExistException(catalogName, 
databaseName);
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to list tables in 
database %s", databaseName), e);
+               }
        }
 
        @Override
@@ -262,13 +330,33 @@ public class GenericHiveMetastoreCatalog implements 
ReadableWritableCatalog {
        }
 
        @Override
-       public CatalogBaseTable getTable(ObjectPath objectPath) throws 
TableNotExistException, CatalogException {
-               throw new UnsupportedOperationException();
+       public CatalogBaseTable getTable(ObjectPath tablePath) throws 
TableNotExistException, CatalogException {
+               Table hiveTable = getHiveTable(tablePath);
+
+               return 
GenericHiveMetastoreCatalogUtil.createCatalogTable(hiveTable);
+       }
+
+       protected Table getHiveTable(ObjectPath tablePath) throws 
TableNotExistException {
+               try {
+                       return client.getTable(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+               } catch (NoSuchObjectException e) {
+                       throw new TableNotExistException(catalogName, 
tablePath);
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to get table %s from Hive 
metastore", tablePath.getFullName()), e);
+               }
        }
 
        @Override
-       public boolean tableExists(ObjectPath objectPath) throws 
CatalogException {
-               throw new UnsupportedOperationException();
+       public boolean tableExists(ObjectPath tablePath) throws 
CatalogException {
+               try {
+                       return client.tableExists(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+               } catch (UnknownDBException e) {
+                       return false;
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to check whether table %s 
exists or not.", tablePath.getFullName()), e);
+               }
        }
 
        // ------ partitions ------
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
index 779905a..0e564f5 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
@@ -18,32 +18,178 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.plan.stats.TableStats;
 
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-
+import java.util.stream.Collectors;
 
 /**
  * Utils to convert meta objects between Flink and Hive for 
GenericHiveMetastoreCatalog.
  */
 public class GenericHiveMetastoreCatalogUtil {
 
+       // Prefix used to distinguish properties created by Hive and Flink,
+       // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+       private static final String FLINK_PROPERTY_PREFIX = "flink.";
+
+       // Flink tables should be stored as 'external' tables in Hive metastore
+       private static final Map<String, String> EXTERNAL_TABLE_PROPERTY = new 
HashMap<String, String>() {{
+               put("EXTERNAL", "TRUE");
+       }};
+
        private GenericHiveMetastoreCatalogUtil() {
        }
 
        // ------ Utils ------
 
        /**
-        * Creates a Hive database from CatalogDatabase.
+        * Creates a Hive database from a CatalogDatabase.
+        *
+        * @param databaseName name of the database
+        * @param catalogDatabase the CatalogDatabase instance
+        * @return a Hive database
         */
-       public static Database createHiveDatabase(String dbName, 
CatalogDatabase db) {
-               Map<String, String> props = db.getProperties();
+       public static Database createHiveDatabase(String databaseName, 
CatalogDatabase catalogDatabase) {
                return new Database(
-                       dbName,
-                       db.getDescription().isPresent() ? 
db.getDescription().get() : null,
+                       databaseName,
+                       catalogDatabase.getDescription().isPresent() ? 
catalogDatabase.getDescription().get() : null,
                        null,
-                       props);
+                       catalogDatabase.getProperties());
+       }
+
+       /**
+        * Creates a Hive table from a CatalogBaseTable.
+        * TODO: [FLINK-12240] Support view related operations in 
GenericHiveMetastoreCatalog
+        *
+        * @param tablePath path of the table
+        * @param table the CatalogBaseTable instance
+        * @return a Hive table
+        */
+       public static Table createHiveTable(ObjectPath tablePath, 
CatalogBaseTable table) {
+               Map<String, String> properties = new 
HashMap<>(table.getProperties());
+
+               // Table comment
+               properties.put(HiveTableConfig.TABLE_COMMENT, 
table.getComment());
+
+               Table hiveTable = new Table();
+               hiveTable.setDbName(tablePath.getDatabaseName());
+               hiveTable.setTableName(tablePath.getObjectName());
+               hiveTable.setCreateTime((int) (System.currentTimeMillis() / 
1000));
+
+               // Table properties
+               hiveTable.setParameters(buildFlinkProperties(properties));
+               hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY);
+
+               // Hive table's StorageDescriptor
+               StorageDescriptor sd = new StorageDescriptor();
+               sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
+
+               List<FieldSchema> allColumns = 
createHiveColumns(table.getSchema());
+
+               // Table columns and partition keys
+               if (table instanceof CatalogTable) {
+                       CatalogTable catalogTable = (CatalogTable) table;
+
+                       if (catalogTable.isPartitioned()) {
+                               int partitionKeySize = 
catalogTable.getPartitionKeys().size();
+                               List<FieldSchema> regularColumns = 
allColumns.subList(0, allColumns.size() - partitionKeySize);
+                               List<FieldSchema> partitionColumns = 
allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
+
+                               sd.setCols(regularColumns);
+                               hiveTable.setPartitionKeys(partitionColumns);
+                       } else {
+                               sd.setCols(allColumns);
+                               hiveTable.setPartitionKeys(new ArrayList<>());
+                       }
+
+                       hiveTable.setSd(sd);
+               } else {
+                       // TODO: [FLINK-12240] Support view related operations 
in GenericHiveMetastoreCatalog
+                       throw new UnsupportedOperationException();
+               }
+
+               return hiveTable;
+       }
+
+       /**
+        * Creates a CatalogBaseTable from a Hive table.
+        * TODO: [FLINK-12240] Support view related operations in 
GenericHiveMetastoreCatalog
+        *
+        * @param hiveTable the Hive table
+        * @return a CatalogBaseTable
+        */
+       public static CatalogBaseTable createCatalogTable(Table hiveTable) {
+               // Table schema
+               TableSchema tableSchema = HiveCatalogBaseUtil.createTableSchema(
+                               hiveTable.getSd().getCols(), 
hiveTable.getPartitionKeys());
+
+               // Table properties
+               Map<String, String> properties = 
retrieveFlinkProperties(hiveTable.getParameters());
+
+               // Table comment
+               String comment = 
properties.remove(HiveTableConfig.TABLE_COMMENT);
+
+               // Partition keys
+               List<String> partitionKeys = new ArrayList<>();
+
+               if (hiveTable.getPartitionKeys() != null && 
hiveTable.getPartitionKeys().isEmpty()) {
+                       partitionKeys = hiveTable.getPartitionKeys().stream()
+                                                               .map(fs -> 
fs.getName())
+                                                               
.collect(Collectors.toList());
+               }
+
+               return new GenericCatalogTable(
+                       tableSchema, new TableStats(0), partitionKeys, 
properties, comment);
+       }
+
+       /**
+        * Create Hive columns from Flink TableSchema.
+        */
+       private static List<FieldSchema> createHiveColumns(TableSchema schema) {
+               String[] fieldNames = schema.getFieldNames();
+               TypeInformation[] fieldTypes = schema.getFieldTypes();
+
+               List<FieldSchema> columns = new ArrayList<>(fieldNames.length);
+
+               for (int i = 0; i < fieldNames.length; i++) {
+                       columns.add(
+                               new FieldSchema(fieldNames[i], 
HiveTypeUtil.toHiveType(fieldTypes[i]), null));
+               }
+
+               return columns;
+       }
+
+       /**
+        * Filter out Hive-created properties, and return Flink-created 
properties.
+        */
+       private static Map<String, String> retrieveFlinkProperties(Map<String, 
String> hiveTableParams) {
+               return hiveTableParams.entrySet().stream()
+                       .filter(e -> 
e.getKey().startsWith(FLINK_PROPERTY_PREFIX))
+                       .collect(Collectors.toMap(e -> 
e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue()));
+       }
+
+       /**
+        * Add a prefix to Flink-created properties to distinguish them from 
Hive-created properties.
+        */
+       public static Map<String, String> buildFlinkProperties(Map<String, 
String> properties) {
+               return properties.entrySet().stream()
+                       .filter(e -> e.getKey() != null && e.getValue() != null)
+                       .collect(Collectors.toMap(e -> FLINK_PROPERTY_PREFIX + 
e.getKey(), e -> e.getValue()));
        }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBaseUtil.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBaseUtil.java
new file mode 100644
index 0000000..3f93b4a
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBaseUtil.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Shared util for catalogs backed by Hive-metastore.
+ */
+public class HiveCatalogBaseUtil {
+
+       /**
+        * Create a Flink's TableSchema from Hive table's columns and partition 
keys.
+        *
+        * @param cols columns of the Hive table
+        * @param partitionKeys partition keys of the Hive table
+        * @return a Flink TableSchema
+        */
+       public static TableSchema createTableSchema(List<FieldSchema> cols, 
List<FieldSchema> partitionKeys) {
+               List<FieldSchema> allCols = new ArrayList<>(cols);
+               allCols.addAll(partitionKeys);
+
+               String[] colNames = new String[allCols.size()];
+               TypeInformation[] colTypes = new 
TypeInformation[allCols.size()];
+
+               for (int i = 0; i < allCols.size(); i++) {
+                       FieldSchema fs = allCols.get(i);
+
+                       colNames[i] = fs.getName();
+                       colTypes[i] = 
HiveTypeUtil.toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(fs.getType()));
+               }
+
+               return new TableSchema(colNames, colTypes);
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
similarity index 61%
copy from 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java
copy to 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
index 91cf133..336d16f 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
@@ -16,20 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.catalog.exceptions;
+package org.apache.flink.table.catalog.hive;
 
 /**
- * Exception for trying to drop on a database that is not empty.
- *
+ * Configs for Flink tables stored in Hive metastore.
  */
-public class DatabaseNotEmptyException extends Exception {
-       private static final String MSG = "Database %s in Catalog %s is not 
empty.";
+public class HiveTableConfig {
 
-       public DatabaseNotEmptyException(String catalog, String database, 
Throwable cause) {
-               super(String.format(MSG, database, catalog), cause);
-       }
+       // Description of the Flink table
+       public static final String TABLE_COMMENT = "comment";
 
-       public DatabaseNotEmptyException(String catalog, String database) {
-               this(catalog, database, null);
-       }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTypeUtil.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTypeUtil.java
new file mode 100644
index 0000000..c665d6c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTypeUtil.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+/**
+ * Utils to convert data types between Flink and Hive.
+ */
+public class HiveTypeUtil {
+
+       // Note: Need to keep this in sync with 
BaseSemanticAnalyzer::getTypeStringFromAST
+       private static final String HIVE_ARRAY_TYPE_NAME_FORMAT = 
serdeConstants.LIST_TYPE_NAME + "<%s>";
+
+       private HiveTypeUtil() {
+       }
+
+       /**
+        * Convert Flink data type to Hive data type.
+        * TODO: the following Hive types are not supported in Flink yet, 
including CHAR, VARCHAR, DECIMAL, MAP, STRUCT
+        *              [FLINK-12386] Support complete mapping between Flink 
and Hive data types
+        *
+        * @param type a Flink data type
+        * @return the corresponding Hive data type
+        */
+       public static String toHiveType(TypeInformation type) {
+               if (type == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+                       return serdeConstants.BOOLEAN_TYPE_NAME;
+               } else if (type == BasicTypeInfo.BYTE_TYPE_INFO) {
+                       return serdeConstants.TINYINT_TYPE_NAME;
+               } else if (type == BasicTypeInfo.SHORT_TYPE_INFO) {
+                       return serdeConstants.SMALLINT_TYPE_NAME;
+               } else if (type == BasicTypeInfo.INT_TYPE_INFO) {
+                       return serdeConstants.INT_TYPE_NAME;
+               } else if (type == BasicTypeInfo.LONG_TYPE_INFO) {
+                       return serdeConstants.BIGINT_TYPE_NAME;
+               } else if (type == BasicTypeInfo.FLOAT_TYPE_INFO) {
+                       return serdeConstants.FLOAT_TYPE_NAME;
+               } else if (type == BasicTypeInfo.DOUBLE_TYPE_INFO) {
+                       return serdeConstants.DOUBLE_TYPE_NAME;
+               } else if (type == BasicTypeInfo.STRING_TYPE_INFO) {
+                       return serdeConstants.STRING_TYPE_NAME;
+               } else if (type == BasicTypeInfo.DATE_TYPE_INFO) {
+                       return serdeConstants.DATE_TYPE_NAME;
+               } else if (type == BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO) {
+                       return serdeConstants.BINARY_TYPE_NAME;
+               } else if (type instanceof SqlTimeTypeInfo) {
+                       return serdeConstants.TIMESTAMP_TYPE_NAME;
+               } else if (type instanceof BasicArrayTypeInfo) {
+                       return toHiveArrayType((BasicArrayTypeInfo) type);
+               } else {
+                       throw new UnsupportedOperationException(
+                               String.format("Flink doesn't support converting 
type %s to Hive type yet.", type.toString()));
+               }
+       }
+
+       private static String toHiveArrayType(BasicArrayTypeInfo arrayTypeInfo) 
{
+               return String.format(HIVE_ARRAY_TYPE_NAME_FORMAT, 
toHiveType(arrayTypeInfo.getComponentInfo()));
+       }
+
+       /**
+        * Convert Hive data type to a Flink data type.
+        * TODO: the following Hive types are not supported in Flink yet, 
including CHAR, VARCHAR, DECIMAL, MAP, STRUCT
+        *      [FLINK-12386] Support complete mapping between Flink and Hive 
data types
+        *
+        * @param hiveType a Hive data type
+        * @return the corresponding Flink data type
+        */
+       public static TypeInformation toFlinkType(TypeInfo hiveType) {
+               switch (hiveType.getCategory()) {
+                       case PRIMITIVE:
+                               return toFlinkPrimitiveType((PrimitiveTypeInfo) 
hiveType);
+                       case LIST:
+                               ListTypeInfo listTypeInfo = (ListTypeInfo) 
hiveType;
+                               return 
BasicArrayTypeInfo.getInfoFor(toFlinkType(listTypeInfo.getListElementTypeInfo()).getTypeClass());
+                       default:
+                               throw new UnsupportedOperationException(
+                                       String.format("Flink doesn't support 
Hive data type %s yet.", hiveType));
+               }
+       }
+
+       // TODO: the following Hive types are not supported in Flink yet, 
including CHAR, VARCHAR, DECIMAL, MAP, STRUCT
+       //    [FLINK-12386] Support complete mapping between Flink and Hive 
data types
+       private static TypeInformation toFlinkPrimitiveType(PrimitiveTypeInfo 
hiveType) {
+               switch (hiveType.getPrimitiveCategory()) {
+                       // For CHAR(p) and VARCHAR(p) types, map them to String 
for now because Flink doesn't yet support them.
+                       case CHAR:
+                       case VARCHAR:
+                       case STRING:
+                               return BasicTypeInfo.STRING_TYPE_INFO;
+                       case BOOLEAN:
+                               return BasicTypeInfo.BOOLEAN_TYPE_INFO;
+                       case BYTE:
+                               return BasicTypeInfo.BYTE_TYPE_INFO;
+                       case SHORT:
+                               return BasicTypeInfo.SHORT_TYPE_INFO;
+                       case INT:
+                               return BasicTypeInfo.INT_TYPE_INFO;
+                       case LONG:
+                               return BasicTypeInfo.LONG_TYPE_INFO;
+                       case FLOAT:
+                               return BasicTypeInfo.FLOAT_TYPE_INFO;
+                       case DOUBLE:
+                               return BasicTypeInfo.DOUBLE_TYPE_INFO;
+                       case DATE:
+                               return BasicTypeInfo.DATE_TYPE_INFO;
+                       case TIMESTAMP:
+                               return SqlTimeTypeInfo.TIMESTAMP;
+                       case BINARY:
+                               return BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO;
+                       default:
+                               throw new UnsupportedOperationException(
+                                       String.format("Flink doesn't support 
Hive primitive type %s yet", hiveType));
+               }
+       }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/NOTICE 
b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/NOTICE
index ea1c160..b67affe 100644
--- a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/NOTICE
+++ b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/NOTICE
@@ -8,6 +8,9 @@ This project bundles the following dependencies under the 
Apache Software Licens
 
 - commons-dbcp:commons-dbcp:1.4
 - commons-pool:commons-pool:1.5.4
+- com.fasterxml.jackson.core:jackson-annotations:2.6.0
+- com.fasterxml.jackson.core:jackson-core:2.6.5
+- com.fasterxml.jackson.core:jackson-databind:2.6.5
 - com.jolbox:bonecp:0.8.0.RELEASE
 - org.apache.hive:hive-common:2.3.4
 - org.apache.hive:hive-metastore:2.3.4
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
index 315d657..2687699 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
@@ -18,10 +18,18 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTestBase;
+import org.apache.flink.table.catalog.CatalogTestUtil;
 import org.apache.flink.table.catalog.GenericCatalogDatabase;
+import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.plan.stats.TableStats;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -40,14 +48,47 @@ public class GenericHiveMetastoreCatalogTest extends 
CatalogTestBase {
                catalog.open();
        }
 
-       // =====================
-       // GenericHiveMetastoreCatalog doesn't support table operation yet
-       // Thus, overriding the following tests which involve table operation 
in CatalogTestBase so they won't run against GenericHiveMetastoreCatalog
-       // =====================
+       // ------ data types ------
 
-       // TODO: re-enable this test once GenericHiveMetastoreCatalog support 
table operations
        @Test
-       public void testDropDb_DatabaseNotEmptyException() throws Exception {
+       public void testDataTypes() throws Exception {
+               // TODO: the following Hive types are not supported in Flink 
yet, including CHAR, VARCHAR, DECIMAL, MAP, STRUCT
+               //        [FLINK-12386] Support complete mapping between Flink 
and Hive data types
+               TypeInformation[] types = new TypeInformation[] {
+                       BasicTypeInfo.BYTE_TYPE_INFO,
+                       BasicTypeInfo.SHORT_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.LONG_TYPE_INFO,
+                       BasicTypeInfo.FLOAT_TYPE_INFO,
+                       BasicTypeInfo.DOUBLE_TYPE_INFO,
+                       BasicTypeInfo.BOOLEAN_TYPE_INFO,
+                       BasicTypeInfo.STRING_TYPE_INFO,
+                       BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO,
+                       BasicTypeInfo.DATE_TYPE_INFO,
+                       SqlTimeTypeInfo.TIMESTAMP
+               };
+
+               verifyDataTypes(types);
+       }
+
+       private void verifyDataTypes(TypeInformation[] types) throws Exception {
+               String[] colNames = new String[types.length];
+
+               for (int i = 0; i < types.length; i++) {
+                       colNames[i] = types[i].toString().toLowerCase() + 
"_col";
+               }
+
+               CatalogTable table = new GenericCatalogTable(
+                       new TableSchema(colNames, types),
+                       new TableStats(0),
+                       getBatchTableProperties(),
+                       TEST_COMMENT
+               );
+
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, table, false);
+
+               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path1));
        }
 
        // ------ utils ------
@@ -77,13 +118,48 @@ public class GenericHiveMetastoreCatalogTest extends 
CatalogTestBase {
 
        @Override
        public CatalogTable createTable() {
-               // TODO: implement this once GenericHiveMetastoreCatalog 
support table operations
-               return null;
+               return new GenericCatalogTable(
+                       createTableSchema(),
+                       new TableStats(0),
+                       getBatchTableProperties(),
+                       TEST_COMMENT);
        }
 
        @Override
        public CatalogTable createAnotherTable() {
-               // TODO: implement this once GenericHiveMetastoreCatalog 
support table operations
-               return null;
+               return new GenericCatalogTable(
+                       createAnotherTableSchema(),
+                       new TableStats(0),
+                       getBatchTableProperties(),
+                       TEST_COMMENT);
+       }
+
+       @Override
+       public CatalogTable createStreamingTable() {
+               return new GenericCatalogTable(
+                       createTableSchema(),
+                       new TableStats(0),
+                       getStreamingTableProperties(),
+                       TEST_COMMENT);
+       }
+
+       @Override
+       public CatalogTable createPartitionedTable() {
+               return new GenericCatalogTable(
+                       createTableSchema(),
+                       new TableStats(0),
+                       createPartitionKeys(),
+                       getBatchTableProperties(),
+                       TEST_COMMENT);
+       }
+
+       @Override
+       public CatalogTable createAnotherPartitionedTable() {
+               return new GenericCatalogTable(
+                       createAnotherTableSchema(),
+                       new TableStats(0),
+                       createPartitionKeys(),
+                       getBatchTableProperties(),
+                       TEST_COMMENT);
        }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
index 83f5bed..4a32313 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.table.catalog.CatalogTestBase;
+
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.junit.rules.TemporaryFolder;
 
@@ -35,7 +37,7 @@ public class HiveTestUtils {
         * Create a GenericHiveMetastoreCatalog with an embedded Hive Metastore.
         */
        public static GenericHiveMetastoreCatalog 
createGenericHiveMetastoreCatalog() throws IOException {
-               return new GenericHiveMetastoreCatalog("test", getHiveConf());
+               return new 
GenericHiveMetastoreCatalog(CatalogTestBase.TEST_CATALOG_NAME, getHiveConf());
        }
 
        private static HiveConf getHiveConf() throws IOException {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
index 47498e7..73c2dbc 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
@@ -61,8 +61,8 @@ public class GenericCatalogTable implements CatalogTable {
                        TableSchema tableSchema,
                        TableStats tableStats,
                        Map<String, String> properties,
-                       String comment) {
-               this(tableSchema, tableStats, new ArrayList<>(), properties, 
comment);
+                       String description) {
+               this(tableSchema, tableStats, new ArrayList<>(), properties, 
description);
        }
 
        @Override
@@ -91,6 +91,11 @@ public class GenericCatalogTable implements CatalogTable {
        }
 
        @Override
+       public String getComment() {
+               return comment;
+       }
+
+       @Override
        public GenericCatalogTable copy() {
                return new GenericCatalogTable(
                        this.tableSchema.copy(), this.tableStats.copy(), new 
ArrayList<>(partitionKeys), new HashMap<>(this.properties), comment);
@@ -106,12 +111,4 @@ public class GenericCatalogTable implements CatalogTable {
                return Optional.of("This is a catalog table in an im-memory 
catalog");
        }
 
-       public String getComment() {
-               return this.comment;
-       }
-
-       public void setComment(String comment) {
-               this.comment = comment;
-       }
-
 }
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index f88db0d..40a0d9f 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.table.catalog;
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
@@ -40,7 +37,6 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -62,15 +58,6 @@ public class GenericInMemoryCatalogTest extends 
CatalogTestBase {
 
        @After
        public void close() throws Exception {
-               if (catalog.tableExists(path1)) {
-                       catalog.dropTable(path1, true);
-               }
-               if (catalog.tableExists(path2)) {
-                       catalog.dropTable(path2, true);
-               }
-               if (catalog.tableExists(path3)) {
-                       catalog.dropTable(path3, true);
-               }
                if (catalog.functionExists(path1)) {
                        catalog.dropFunction(path1, true);
                }
@@ -79,108 +66,6 @@ public class GenericInMemoryCatalogTest extends 
CatalogTestBase {
        // ------ tables ------
 
        @Test
-       public void testCreateTable_Streaming() throws Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               CatalogTable table = createStreamingTable();
-               catalog.createTable(path1, table, false);
-
-               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path1));
-       }
-
-       @Test
-       public void testCreateTable_Batch() throws Exception {
-               catalog.createDatabase(db1, createDb(), false);
-
-               // Non-partitioned table
-               CatalogTable table = createTable();
-               catalog.createTable(path1, table, false);
-
-               CatalogBaseTable tableCreated = catalog.getTable(path1);
-
-               CatalogTestUtil.checkEquals(table, (CatalogTable) tableCreated);
-               assertEquals(TABLE_COMMENT, 
tableCreated.getDescription().get());
-
-               List<String> tables = catalog.listTables(db1);
-
-               assertEquals(1, tables.size());
-               assertEquals(path1.getObjectName(), tables.get(0));
-
-               catalog.dropTable(path1, false);
-
-               // Partitioned table
-               table = createPartitionedTable();
-               catalog.createTable(path1, table, false);
-
-               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path1));
-
-               tables = catalog.listTables(db1);
-
-               assertEquals(1, tables.size());
-               assertEquals(path1.getObjectName(), tables.get(0));
-       }
-
-       @Test
-       public void testCreateTable_DatabaseNotExistException() throws 
Exception {
-               assertFalse(catalog.databaseExists(db1));
-
-               exception.expect(DatabaseNotExistException.class);
-               exception.expectMessage("Database db1 does not exist in 
Catalog");
-               catalog.createTable(nonExistObjectPath, createTable(), false);
-       }
-
-       @Test
-       public void testCreateTable_TableAlreadyExistException() throws 
Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               catalog.createTable(path1,  createTable(), false);
-
-               exception.expect(TableAlreadyExistException.class);
-               exception.expectMessage("Table (or view) db1.t1 already exists 
in Catalog");
-               catalog.createTable(path1, createTable(), false);
-       }
-
-       @Test
-       public void testCreateTable_TableAlreadyExist_ignored() throws 
Exception {
-               catalog.createDatabase(db1, createDb(), false);
-
-               CatalogTable table = createTable();
-               catalog.createTable(path1, table, false);
-
-               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path1));
-
-               catalog.createTable(path1, createAnotherTable(), true);
-
-               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path1));
-       }
-
-       @Test
-       public void testGetTable_TableNotExistException() throws Exception {
-               catalog.createDatabase(db1, createDb(), false);
-
-               exception.expect(TableNotExistException.class);
-               exception.expectMessage("Table (or view) db1.nonexist does not 
exist in Catalog");
-               catalog.getTable(nonExistObjectPath);
-       }
-
-       @Test
-       public void testGetTable_TableNotExistException_NoDb() throws Exception 
{
-               exception.expect(TableNotExistException.class);
-               exception.expectMessage("Table (or view) db1.nonexist does not 
exist in Catalog");
-               catalog.getTable(nonExistObjectPath);
-       }
-
-       @Test
-       public void testDropTable_nonPartitionedTable() throws Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               catalog.createTable(path1, createTable(), false);
-
-               assertTrue(catalog.tableExists(path1));
-
-               catalog.dropTable(path1, false);
-
-               assertFalse(catalog.tableExists(path1));
-       }
-
-       @Test
        public void testDropTable_partitionedTable() throws Exception {
                catalog.createDatabase(db1, createDb(), false);
                catalog.createTable(path1, createPartitionedTable(), false);
@@ -214,78 +99,6 @@ public class GenericInMemoryCatalogTest extends 
CatalogTestBase {
        }
 
        @Test
-       public void testDropTable_TableNotExistException() throws Exception {
-               exception.expect(TableNotExistException.class);
-               exception.expectMessage("Table (or view) non.exist does not 
exist in Catalog");
-               catalog.dropTable(nonExistDbPath, false);
-       }
-
-       @Test
-       public void testDropTable_TableNotExist_ignored() throws Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               catalog.dropTable(nonExistObjectPath, true);
-       }
-
-       @Test
-       public void testAlterTable() throws Exception {
-               catalog.createDatabase(db1, createDb(), false);
-
-               // Non-partitioned table
-               CatalogTable table = createTable();
-               catalog.createTable(path1, table, false);
-
-               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path1));
-
-               CatalogTable newTable = createAnotherTable();
-               catalog.alterTable(path1, newTable, false);
-
-               assertNotEquals(table, catalog.getTable(path1));
-               CatalogTestUtil.checkEquals(newTable, (CatalogTable) 
catalog.getTable(path1));
-
-               catalog.dropTable(path1, false);
-
-               // Partitioned table
-               table = createPartitionedTable();
-               catalog.createTable(path1, table, false);
-
-               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path1));
-
-               newTable = createAnotherPartitionedTable();
-               catalog.alterTable(path1, newTable, false);
-
-               CatalogTestUtil.checkEquals(newTable, (CatalogTable) 
catalog.getTable(path1));
-       }
-
-       @Test
-       public void testAlterTable_TableNotExistException() throws Exception {
-               exception.expect(TableNotExistException.class);
-               exception.expectMessage("Table (or view) non.exist does not 
exist in Catalog");
-               catalog.alterTable(nonExistDbPath, createTable(), false);
-       }
-
-       @Test
-       public void testAlterTable_TableNotExist_ignored() throws Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               catalog.alterTable(nonExistObjectPath, createTable(), true);
-
-               assertFalse(catalog.tableExists(nonExistObjectPath));
-       }
-
-       @Test
-       public void testRenameTable_nonPartitionedTable() throws Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               CatalogTable table = createTable();
-               catalog.createTable(path1, table, false);
-
-               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path1));
-
-               catalog.renameTable(path1, t2, false);
-
-               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path3));
-               assertFalse(catalog.tableExists(path1));
-       }
-
-       @Test
        public void testRenameTable_partitionedTable() throws Exception {
                catalog.createDatabase(db1, createDb(), false);
                CatalogTable table = createPartitionedTable();
@@ -305,44 +118,6 @@ public class GenericInMemoryCatalogTest extends 
CatalogTestBase {
                assertFalse(catalog.partitionExists(path1, 
catalogPartitionSpec));
        }
 
-       @Test
-       public void testRenameTable_TableNotExistException() throws Exception {
-               catalog.createDatabase(db1, createDb(), false);
-
-               exception.expect(TableNotExistException.class);
-               exception.expectMessage("Table (or view) db1.t1 does not exist 
in Catalog");
-               catalog.renameTable(path1, t2, false);
-       }
-
-       @Test
-       public void testRenameTable_TableNotExistException_ignored() throws 
Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               catalog.renameTable(path1, t2, true);
-       }
-
-       @Test
-       public void testRenameTable_TableAlreadyExistException() throws 
Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               CatalogTable table = createTable();
-               catalog.createTable(path1, table, false);
-               catalog.createTable(path3, createAnotherTable(), false);
-
-               exception.expect(TableAlreadyExistException.class);
-               exception.expectMessage("Table (or view) db1.t2 already exists 
in Catalog");
-               catalog.renameTable(path1, t2, false);
-       }
-
-       @Test
-       public void testTableExists() throws Exception {
-               catalog.createDatabase(db1, createDb(), false);
-
-               assertFalse(catalog.tableExists(path1));
-
-               catalog.createTable(path1, createTable(), false);
-
-               assertTrue(catalog.tableExists(path1));
-       }
-
        // ------ views ------
 
        @Test
@@ -951,24 +726,29 @@ public class GenericInMemoryCatalogTest extends 
CatalogTestBase {
 
        @Override
        public CatalogDatabase createDb() {
-               return new GenericCatalogDatabase(new HashMap<String, String>() 
{{
-                       put("k1", "v1");
-               }}, TEST_COMMENT);
+               return new GenericCatalogDatabase(
+                       new HashMap<String, String>() {{
+                               put("k1", "v1");
+                       }},
+                       TEST_COMMENT);
        }
 
        @Override
        public CatalogDatabase createAnotherDb() {
-               return new GenericCatalogDatabase(new HashMap<String, String>() 
{{
-                       put("k2", "v2");
-               }}, "this is another database.");
+               return new GenericCatalogDatabase(
+                       new HashMap<String, String>() {{
+                               put("k2", "v2");
+                       }},
+                       "this is another database.");
        }
 
-       private GenericCatalogTable createStreamingTable() {
+       @Override
+       public GenericCatalogTable createStreamingTable() {
                return new GenericCatalogTable(
                        createTableSchema(),
                        new TableStats(0),
                        getStreamingTableProperties(),
-                       TABLE_COMMENT);
+                       TEST_COMMENT);
        }
 
        @Override
@@ -977,7 +757,7 @@ public class GenericInMemoryCatalogTest extends 
CatalogTestBase {
                        createTableSchema(),
                        new TableStats(0),
                        getBatchTableProperties(),
-                       TABLE_COMMENT);
+                       TEST_COMMENT);
        }
 
        @Override
@@ -986,29 +766,27 @@ public class GenericInMemoryCatalogTest extends 
CatalogTestBase {
                        createAnotherTableSchema(),
                        new TableStats(0),
                        getBatchTableProperties(),
-                       TABLE_COMMENT);
+                       TEST_COMMENT);
        }
 
-       protected CatalogTable createPartitionedTable() {
+       @Override
+       public CatalogTable createPartitionedTable() {
                return new GenericCatalogTable(
                        createTableSchema(),
                        new TableStats(0),
                        createPartitionKeys(),
                        getBatchTableProperties(),
-                       TABLE_COMMENT);
+                       TEST_COMMENT);
        }
 
-       protected CatalogTable createAnotherPartitionedTable() {
+       @Override
+       public CatalogTable createAnotherPartitionedTable() {
                return new GenericCatalogTable(
                        createAnotherTableSchema(),
                        new TableStats(0),
                        createPartitionKeys(),
                        getBatchTableProperties(),
-                       TABLE_COMMENT);
-       }
-
-       private List<String> createPartitionKeys() {
-               return Arrays.asList("second", "third");
+                       TEST_COMMENT);
        }
 
        private CatalogPartitionSpec createPartitionSpec() {
@@ -1053,40 +831,6 @@ public class GenericInMemoryCatalogTest extends 
CatalogTestBase {
                return new GenericCatalogPartition(props);
        }
 
-       private Map<String, String> getBatchTableProperties() {
-               return new HashMap<String, String>() {{
-                       put(IS_STREAMING, "false");
-               }};
-       }
-
-       private Map<String, String> getStreamingTableProperties() {
-               return new HashMap<String, String>() {{
-                       put(IS_STREAMING, "true");
-               }};
-       }
-
-       private TableSchema createTableSchema() {
-               return new TableSchema(
-                       new String[] {"first", "second", "third"},
-                       new TypeInformation[] {
-                               BasicTypeInfo.STRING_TYPE_INFO,
-                               BasicTypeInfo.INT_TYPE_INFO,
-                               BasicTypeInfo.STRING_TYPE_INFO,
-                       }
-               );
-       }
-
-       private TableSchema createAnotherTableSchema() {
-               return new TableSchema(
-                       new String[] {"first2", "second", "third"},
-                       new TypeInformation[] {
-                               BasicTypeInfo.STRING_TYPE_INFO,
-                               BasicTypeInfo.STRING_TYPE_INFO,
-                               BasicTypeInfo.STRING_TYPE_INFO
-                       }
-               );
-       }
-
        private CatalogView createView() {
                return new GenericCatalogView(
                        String.format("select * from %s", t1),
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
index f916354..e2157e0 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
@@ -30,18 +30,28 @@ import java.util.Optional;
 public interface CatalogBaseTable {
        /**
         * Get the properties of the table.
-        * @return table property map
+        *
+        * @return property map of the table/view
         */
        Map<String, String> getProperties();
 
        /**
         * Get the schema of the table.
-        * @return schema of the table
+        *
+        * @return schema of the table/view.
         */
        TableSchema getSchema();
 
        /**
+        * Get comment of the table or view.
+        *
+        * @return comment of the table/view.
+        */
+       String getComment();
+
+       /**
         * Get a deep copy of the CatalogBaseTable instance.
+        *
         * @return a copy of the CatalogBaseTable instance
         */
        CatalogBaseTable copy();
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
index a8a69f2..5586348 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
@@ -96,6 +96,8 @@ public interface ReadableCatalog {
         */
        boolean databaseExists(String databaseName) throws CatalogException;
 
+       // ------ tables and views ------
+
        /**
         * Get names of all tables and views under this database. An empty list 
is returned if none exists.
         *
@@ -116,24 +118,24 @@ public interface ReadableCatalog {
        List<String> listViews(String databaseName) throws 
DatabaseNotExistException, CatalogException;
 
        /**
-        * Get a CatalogTable or CatalogView identified by objectPath.
+        * Get a CatalogTable or CatalogView identified by tablePath.
         *
-        * @param objectPath            Path of the table or view
+        * @param tablePath             Path of the table or view
         * @return The requested table or view
         * @throws TableNotExistException if the target does not exist
         * @throws CatalogException in case of any runtime exception
         */
-       CatalogBaseTable getTable(ObjectPath objectPath) throws 
TableNotExistException, CatalogException;
+       CatalogBaseTable getTable(ObjectPath tablePath) throws 
TableNotExistException, CatalogException;
 
        /**
         * Check if a table or view exists in this catalog.
         *
-        * @param objectPath    Path of the table or view
+        * @param tablePath    Path of the table or view
         * @return true if the given table exists in the catalog
         *         false otherwise
         * @throws CatalogException in case of any runtime exception
         */
-       boolean tableExists(ObjectPath objectPath) throws CatalogException;
+       boolean tableExists(ObjectPath tablePath) throws CatalogException;
 
        // ------ partitions ------
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
index 04d9bcb..60bc93d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
@@ -102,11 +102,10 @@ public interface ReadableWritableCatalog extends 
ReadableCatalog {
         *                          if set to false, throw an exception,
         *                          if set to true, do nothing.
         * @throws TableNotExistException if the table does not exist
-        * @throws DatabaseNotExistException if the database in tablePath to 
doesn't exist
         * @throws CatalogException in case of any runtime exception
         */
        void renameTable(ObjectPath tablePath, String newTableName, boolean 
ignoreIfNotExists)
-               throws TableNotExistException, TableAlreadyExistException, 
DatabaseNotExistException, CatalogException;
+               throws TableNotExistException, TableAlreadyExistException, 
CatalogException;
 
        /**
         * Create a new table or view.
@@ -128,7 +127,7 @@ public interface ReadableWritableCatalog extends 
ReadableCatalog {
         * Note that the new and old CatalogBaseTable must be of the same type. 
For example, this doesn't
         * allow alter a regular table to partitioned table, or alter a view to 
a table, and vice versa.
         *
-        * @param tableName path of the table or view to be modified
+        * @param tablePath path of the table or view to be modified
         * @param newTable the new table definition
         * @param ignoreIfNotExists flag to specify behavior when the table or 
view does not exist:
         *                          if set to false, throw an exception,
@@ -136,7 +135,7 @@ public interface ReadableWritableCatalog extends 
ReadableCatalog {
         * @throws TableNotExistException if the table does not exist
         * @throws CatalogException in case of any runtime exception
         */
-       void alterTable(ObjectPath tableName, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
+       void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
                throws TableNotExistException, CatalogException;
 
        // ------ partitions ------
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java
index 91cf133..423d141 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java
@@ -23,7 +23,7 @@ package org.apache.flink.table.catalog.exceptions;
  *
  */
 public class DatabaseNotEmptyException extends Exception {
-       private static final String MSG = "Database %s in Catalog %s is not 
empty.";
+       private static final String MSG = "Database %s in catalog %s is not 
empty.";
 
        public DatabaseNotEmptyException(String catalog, String database, 
Throwable cause) {
                super(String.format(MSG, database, catalog), cause);
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index 5457d34..8117c82 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -18,9 +18,14 @@
 
 package org.apache.flink.table.catalog;
 
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 
 import org.junit.After;
 import org.junit.AfterClass;
@@ -29,11 +34,14 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -55,9 +63,9 @@ public abstract class CatalogTestBase {
        protected final ObjectPath nonExistDbPath = 
ObjectPath.fromString("non.exist");
        protected final ObjectPath nonExistObjectPath = 
ObjectPath.fromString("db1.nonexist");
 
-       protected static final String TEST_CATALOG_NAME = "test-catalog";
+       public static final String TEST_CATALOG_NAME = "test-catalog";
+
        protected static final String TEST_COMMENT = "test comment";
-       protected static final String TABLE_COMMENT = "This is my batch table";
 
        protected static ReadableWritableCatalog catalog;
 
@@ -66,6 +74,16 @@ public abstract class CatalogTestBase {
 
        @After
        public void cleanup() throws Exception {
+               if (catalog.tableExists(path1)) {
+                       catalog.dropTable(path1, true);
+               }
+               if (catalog.tableExists(path2)) {
+                       catalog.dropTable(path2, true);
+               }
+               if (catalog.tableExists(path3)) {
+                       catalog.dropTable(path3, true);
+               }
+
                if (catalog.databaseExists(db1)) {
                        catalog.dropDatabase(db1, true);
                }
@@ -168,7 +186,7 @@ public abstract class CatalogTestBase {
                catalog.createTable(path1, createTable(), false);
 
                exception.expect(DatabaseNotEmptyException.class);
-               exception.expectMessage("Database db1 in Catalog test-catalog 
is not empty");
+               exception.expectMessage("Database db1 in catalog test-catalog 
is not empty");
                catalog.dropDatabase(db1, true);
        }
 
@@ -209,6 +227,220 @@ public abstract class CatalogTestBase {
                assertTrue(catalog.databaseExists(db1));
        }
 
+       // ------ tables ------
+
+       @Test
+       public void testCreateTable_Streaming() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               CatalogTable table = createStreamingTable();
+               catalog.createTable(path1, table, false);
+
+               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path1));
+       }
+
+       @Test
+       public void testCreateTable_Batch() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+
+               // Non-partitioned table
+               CatalogTable table = createTable();
+               catalog.createTable(path1, table, false);
+
+               CatalogBaseTable tableCreated = catalog.getTable(path1);
+
+               CatalogTestUtil.checkEquals(table, (CatalogTable) tableCreated);
+               assertEquals(TEST_COMMENT, tableCreated.getDescription().get());
+
+               List<String> tables = catalog.listTables(db1);
+
+               assertEquals(1, tables.size());
+               assertEquals(path1.getObjectName(), tables.get(0));
+
+               catalog.dropTable(path1, false);
+
+               // Partitioned table
+               table = createPartitionedTable();
+               catalog.createTable(path1, table, false);
+
+               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path1));
+
+               tables = catalog.listTables(db1);
+
+               assertEquals(1, tables.size());
+               assertEquals(path1.getObjectName(), tables.get(0));
+       }
+
+       @Test
+       public void testCreateTable_DatabaseNotExistException() throws 
Exception {
+               assertFalse(catalog.databaseExists(db1));
+
+               exception.expect(DatabaseNotExistException.class);
+               exception.expectMessage("Database db1 does not exist in 
Catalog");
+               catalog.createTable(nonExistObjectPath, createTable(), false);
+       }
+
+       @Test
+       public void testCreateTable_TableAlreadyExistException() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1,  createTable(), false);
+
+               exception.expect(TableAlreadyExistException.class);
+               exception.expectMessage("Table (or view) db1.t1 already exists 
in Catalog");
+               catalog.createTable(path1, createTable(), false);
+       }
+
+       @Test
+       public void testCreateTable_TableAlreadyExist_ignored() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+
+               CatalogTable table = createTable();
+               catalog.createTable(path1, table, false);
+
+               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path1));
+
+               catalog.createTable(path1, createAnotherTable(), true);
+
+               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path1));
+       }
+
+       @Test
+       public void testGetTable_TableNotExistException() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+
+               exception.expect(TableNotExistException.class);
+               exception.expectMessage("Table (or view) db1.nonexist does not 
exist in Catalog");
+               catalog.getTable(nonExistObjectPath);
+       }
+
+       @Test
+       public void testGetTable_TableNotExistException_NoDb() throws Exception 
{
+               exception.expect(TableNotExistException.class);
+               exception.expectMessage("Table (or view) db1.nonexist does not 
exist in Catalog");
+               catalog.getTable(nonExistObjectPath);
+       }
+
+       @Test
+       public void testDropTable_nonPartitionedTable() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createTable(), false);
+
+               assertTrue(catalog.tableExists(path1));
+
+               catalog.dropTable(path1, false);
+
+               assertFalse(catalog.tableExists(path1));
+       }
+
+       @Test
+       public void testDropTable_TableNotExistException() throws Exception {
+               exception.expect(TableNotExistException.class);
+               exception.expectMessage("Table (or view) non.exist does not 
exist in Catalog");
+               catalog.dropTable(nonExistDbPath, false);
+       }
+
+       @Test
+       public void testDropTable_TableNotExist_ignored() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.dropTable(nonExistObjectPath, true);
+       }
+
+       @Test
+       public void testAlterTable() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+
+               // Non-partitioned table
+               CatalogTable table = createTable();
+               catalog.createTable(path1, table, false);
+
+               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path1));
+
+               CatalogTable newTable = createAnotherTable();
+               catalog.alterTable(path1, newTable, false);
+
+               assertNotEquals(table, catalog.getTable(path1));
+               CatalogTestUtil.checkEquals(newTable, (CatalogTable) 
catalog.getTable(path1));
+
+               catalog.dropTable(path1, false);
+
+               // Partitioned table
+               table = createPartitionedTable();
+               catalog.createTable(path1, table, false);
+
+               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path1));
+
+               newTable = createAnotherPartitionedTable();
+               catalog.alterTable(path1, newTable, false);
+
+               CatalogTestUtil.checkEquals(newTable, (CatalogTable) 
catalog.getTable(path1));
+       }
+
+       @Test
+       public void testAlterTable_TableNotExistException() throws Exception {
+               exception.expect(TableNotExistException.class);
+               exception.expectMessage("Table (or view) non.exist does not 
exist in Catalog");
+               catalog.alterTable(nonExistDbPath, createTable(), false);
+       }
+
+       @Test
+       public void testAlterTable_TableNotExist_ignored() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.alterTable(nonExistObjectPath, createTable(), true);
+
+               assertFalse(catalog.tableExists(nonExistObjectPath));
+       }
+
+       @Test
+       public void testRenameTable_nonPartitionedTable() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               CatalogTable table = createTable();
+               catalog.createTable(path1, table, false);
+
+               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path1));
+
+               catalog.renameTable(path1, t2, false);
+
+               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path3));
+               assertFalse(catalog.tableExists(path1));
+       }
+
+       @Test
+       public void testRenameTable_TableNotExistException() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+
+               exception.expect(TableNotExistException.class);
+               exception.expectMessage("Table (or view) db1.t1 does not exist 
in Catalog");
+               catalog.renameTable(path1, t2, false);
+       }
+
+       @Test
+       public void testRenameTable_TableNotExistException_ignored() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.renameTable(path1, t2, true);
+       }
+
+       @Test
+       public void testRenameTable_TableAlreadyExistException() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               CatalogTable table = createTable();
+               catalog.createTable(path1, table, false);
+               catalog.createTable(path3, createAnotherTable(), false);
+
+               exception.expect(TableAlreadyExistException.class);
+               exception.expectMessage("Table (or view) db1.t2 already exists 
in Catalog");
+               catalog.renameTable(path1, t2, false);
+       }
+
+       @Test
+       public void testTableExists() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+
+               assertFalse(catalog.tableExists(path1));
+
+               catalog.createTable(path1, createTable(), false);
+
+               assertTrue(catalog.tableExists(path1));
+       }
+
        // ------ utilities ------
 
        /**
@@ -245,4 +477,63 @@ public abstract class CatalogTestBase {
         * @return another CatalogTable instance
         */
        public abstract CatalogTable createAnotherTable();
+
+       /**
+        * Create a streaming CatalogTable instance by specific catalog 
implementation.
+        *
+        * @return a streaming CatalogTable instance
+        */
+       public abstract CatalogTable createStreamingTable();
+
+       /**
+        * Create a partitioned CatalogTable instance by specific catalog 
implementation.
+        *
+        * @return a streaming CatalogTable instance
+        */
+       public abstract CatalogTable createPartitionedTable();
+
+       /**
+        * Create another partitioned CatalogTable instance by specific catalog 
implementation.
+        *
+        * @return another partitioned CatalogTable instance
+        */
+       public abstract CatalogTable createAnotherPartitionedTable();
+
+       protected TableSchema createTableSchema() {
+               return new TableSchema(
+                       new String[] {"first", "second", "third"},
+                       new TypeInformation[] {
+                               BasicTypeInfo.STRING_TYPE_INFO,
+                               BasicTypeInfo.INT_TYPE_INFO,
+                               BasicTypeInfo.STRING_TYPE_INFO,
+                       }
+               );
+       }
+
+       protected TableSchema createAnotherTableSchema() {
+               return new TableSchema(
+                       new String[] {"first2", "second", "third"},
+                       new TypeInformation[] {
+                               BasicTypeInfo.STRING_TYPE_INFO,
+                               BasicTypeInfo.STRING_TYPE_INFO,
+                               BasicTypeInfo.STRING_TYPE_INFO
+                       }
+               );
+       }
+
+       protected List<String> createPartitionKeys() {
+               return Arrays.asList("second", "third");
+       }
+
+       protected Map<String, String> getBatchTableProperties() {
+               return new HashMap<String, String>() {{
+                       put(IS_STREAMING, "false");
+               }};
+       }
+
+       protected Map<String, String> getStreamingTableProperties() {
+               return new HashMap<String, String>() {{
+                       put(IS_STREAMING, "true");
+               }};
+       }
 }
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index 0700736..57341fe 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -33,26 +33,26 @@ public class CatalogTestUtil {
                assertEquals(t1.getDescription(), t2.getDescription());
        }
 
-       protected static void checkEquals(TableStats ts1, TableStats ts2) {
+       public static void checkEquals(TableStats ts1, TableStats ts2) {
                assertEquals(ts1.getRowCount(), ts2.getRowCount());
                assertEquals(ts1.getColumnStats().size(), 
ts2.getColumnStats().size());
        }
 
-       protected static void checkEquals(CatalogView v1, CatalogView v2) {
+       public static void checkEquals(CatalogView v1, CatalogView v2) {
                assertEquals(v1.getOriginalQuery(), v2.getOriginalQuery());
                assertEquals(v1.getExpandedQuery(), v2.getExpandedQuery());
        }
 
-       protected static void checkEquals(CatalogDatabase d1, CatalogDatabase 
d2) {
+       public static void checkEquals(CatalogDatabase d1, CatalogDatabase d2) {
                assertEquals(d1.getProperties(), d2.getProperties());
        }
 
-       protected static void checkEquals(CatalogFunction f1, CatalogFunction 
f2) {
+       public static void checkEquals(CatalogFunction f1, CatalogFunction f2) {
                assertEquals(f1.getClassName(), f2.getClassName());
                assertEquals(f1.getProperties(), f2.getProperties());
        }
 
-       protected static void checkEquals(CatalogPartition p1, CatalogPartition 
p2) {
+       public static void checkEquals(CatalogPartition p1, CatalogPartition 
p2) {
                assertEquals(p1.getProperties(), p2.getProperties());
        }
 }

Reply via email to