This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new cf20197 [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog cf20197 is described below commit cf20197f1c8c51eab028da7c477dd6c7ad96db2f Author: bowen.li <bowenl...@gmail.com> AuthorDate: Wed May 1 14:26:35 2019 -0700 [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog This PR enables GenericHiveMetastoreCatalog to operate Flink tables by using Hive metastore as a storage. Flink tables will be stored as Hive tables in metastore, and GenericHiveMetastoreCatalog can convert between Flink and Hive tables upon read and write. This closes #8329 --- flink-connectors/flink-connector-hive/pom.xml | 5 +- .../catalog/hive/GenericHiveMetastoreCatalog.java | 114 +++++++- .../hive/GenericHiveMetastoreCatalogUtil.java | 160 ++++++++++- .../table/catalog/hive/HiveCatalogBaseUtil.java | 58 ++++ .../flink/table/catalog/hive/HiveTableConfig.java | 16 +- .../flink/table/catalog/hive/HiveTypeUtil.java | 140 ++++++++++ .../src/main/resources/META-INF/NOTICE | 3 + .../hive/GenericHiveMetastoreCatalogTest.java | 96 ++++++- .../flink/table/catalog/hive/HiveTestUtils.java | 4 +- .../flink/table/catalog/GenericCatalogTable.java | 17 +- .../table/catalog/GenericInMemoryCatalogTest.java | 298 ++------------------- .../flink/table/catalog/CatalogBaseTable.java | 14 +- .../flink/table/catalog/ReadableCatalog.java | 12 +- .../table/catalog/ReadableWritableCatalog.java | 7 +- .../exceptions/DatabaseNotEmptyException.java | 2 +- .../flink/table/catalog/CatalogTestBase.java | 297 +++++++++++++++++++- .../flink/table/catalog/CatalogTestUtil.java | 10 +- 17 files changed, 900 insertions(+), 353 deletions(-) diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml index cb09934..1b672d4 100644 --- a/flink-connectors/flink-connector-hive/pom.xml +++ b/flink-connectors/flink-connector-hive/pom.xml @@ -197,10 +197,6 @@ under the License. <artifactId>metrics-json</artifactId> </exclusion> <exclusion> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </exclusion> - <exclusion> <groupId>com.github.joshelser</groupId> <artifactId>dropwizard-metrics-hadoop-metrics2-reporter</artifactId> </exclusion> @@ -387,6 +383,7 @@ under the License. <include>commons-dbcp:commons-dbcp</include> <include>commons-pool:commons-pool</include> <include>commons-beanutils:commons-beanutils</include> + <include>com.fasterxml.jackson.core:*</include> <include>com.jolbox:bonecp</include> <include>org.apache.hive:*</include> <include>org.apache.thrift:libthrift</include> diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java index a8fcf62..50ed2e9 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java @@ -49,6 +49,8 @@ import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -229,31 +231,97 @@ public class GenericHiveMetastoreCatalog implements ReadableWritableCatalog { @Override public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); + try { + client.dropTable( + tablePath.getDatabaseName(), + tablePath.getObjectName(), + // Indicate whether associated data should be deleted. + // Set to 'true' for now because Flink tables shouldn't have data in Hive. Can be changed later if necessary + true, + ignoreIfNotExists); + } catch (NoSuchObjectException e) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to drop table %s", tablePath.getFullName()), e); + } } @Override public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) - throws TableNotExistException, TableAlreadyExistException, DatabaseNotExistException, CatalogException { - throw new UnsupportedOperationException(); + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + // alter_table() doesn't throw a clear exception when target table doesn't exist. Thus, check the table existence explicitly + if (tableExists(tablePath)) { + ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + // alter_table() doesn't throw a clear exception when new table already exists. Thus, check the table existence explicitly + if (tableExists(newPath)) { + throw new TableAlreadyExistException(catalogName, newPath); + } else { + Table table = getHiveTable(tablePath); + table.setTableName(newTableName); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename table %s", tablePath.getFullName()), e); + } } @Override public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { - throw new UnsupportedOperationException(); + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); + } else { + try { + client.createTable(GenericHiveMetastoreCatalogUtil.createHiveTable(tablePath, table)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); + } + } } @Override - public void alterTable(ObjectPath tableName, CatalogBaseTable newTable, boolean ignoreIfNotExists) + public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); + if (!tableExists(tablePath)) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } else { + // IMetastoreClient.alter_table() requires the table to have a valid location, which it doesn't in this case + // Thus we have to translate alterTable() into (dropTable() + createTable()) + dropTable(tablePath, false); + try { + createTable(tablePath, newTable, false); + } catch (TableAlreadyExistException | DatabaseNotExistException e) { + // These exceptions wouldn't be thrown, unless a concurrent operation is triggered in Hive + throw new CatalogException( + String.format("Failed to alter table %s", tablePath), e); + } + } } @Override - public List<String> listTables(String databaseName) - throws DatabaseNotExistException, CatalogException { - throw new UnsupportedOperationException(); + public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException { + try { + return client.getAllTables(databaseName); + } catch (UnknownDBException e) { + throw new DatabaseNotExistException(catalogName, databaseName); + } catch (TException e) { + throw new CatalogException( + String.format("Failed to list tables in database %s", databaseName), e); + } } @Override @@ -262,13 +330,33 @@ public class GenericHiveMetastoreCatalog implements ReadableWritableCatalog { } @Override - public CatalogBaseTable getTable(ObjectPath objectPath) throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); + public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { + Table hiveTable = getHiveTable(tablePath); + + return GenericHiveMetastoreCatalogUtil.createCatalogTable(hiveTable); + } + + protected Table getHiveTable(ObjectPath tablePath) throws TableNotExistException { + try { + return client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName()); + } catch (NoSuchObjectException e) { + throw new TableNotExistException(catalogName, tablePath); + } catch (TException e) { + throw new CatalogException( + String.format("Failed to get table %s from Hive metastore", tablePath.getFullName()), e); + } } @Override - public boolean tableExists(ObjectPath objectPath) throws CatalogException { - throw new UnsupportedOperationException(); + public boolean tableExists(ObjectPath tablePath) throws CatalogException { + try { + return client.tableExists(tablePath.getDatabaseName(), tablePath.getObjectName()); + } catch (UnknownDBException e) { + return false; + } catch (TException e) { + throw new CatalogException( + String.format("Failed to check whether table %s exists or not.", tablePath.getFullName()), e); + } } // ------ partitions ------ diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java index 779905a..0e564f5 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java @@ -18,32 +18,178 @@ package org.apache.flink.table.catalog.hive; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.GenericCatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.plan.stats.TableStats; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; - +import java.util.stream.Collectors; /** * Utils to convert meta objects between Flink and Hive for GenericHiveMetastoreCatalog. */ public class GenericHiveMetastoreCatalogUtil { + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + + // Flink tables should be stored as 'external' tables in Hive metastore + private static final Map<String, String> EXTERNAL_TABLE_PROPERTY = new HashMap<String, String>() {{ + put("EXTERNAL", "TRUE"); + }}; + private GenericHiveMetastoreCatalogUtil() { } // ------ Utils ------ /** - * Creates a Hive database from CatalogDatabase. + * Creates a Hive database from a CatalogDatabase. + * + * @param databaseName name of the database + * @param catalogDatabase the CatalogDatabase instance + * @return a Hive database */ - public static Database createHiveDatabase(String dbName, CatalogDatabase db) { - Map<String, String> props = db.getProperties(); + public static Database createHiveDatabase(String databaseName, CatalogDatabase catalogDatabase) { return new Database( - dbName, - db.getDescription().isPresent() ? db.getDescription().get() : null, + databaseName, + catalogDatabase.getDescription().isPresent() ? catalogDatabase.getDescription().get() : null, null, - props); + catalogDatabase.getProperties()); + } + + /** + * Creates a Hive table from a CatalogBaseTable. + * TODO: [FLINK-12240] Support view related operations in GenericHiveMetastoreCatalog + * + * @param tablePath path of the table + * @param table the CatalogBaseTable instance + * @return a Hive table + */ + public static Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) { + Map<String, String> properties = new HashMap<>(table.getProperties()); + + // Table comment + properties.put(HiveTableConfig.TABLE_COMMENT, table.getComment()); + + Table hiveTable = new Table(); + hiveTable.setDbName(tablePath.getDatabaseName()); + hiveTable.setTableName(tablePath.getObjectName()); + hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000)); + + // Table properties + hiveTable.setParameters(buildFlinkProperties(properties)); + hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY); + + // Hive table's StorageDescriptor + StorageDescriptor sd = new StorageDescriptor(); + sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>())); + + List<FieldSchema> allColumns = createHiveColumns(table.getSchema()); + + // Table columns and partition keys + if (table instanceof CatalogTable) { + CatalogTable catalogTable = (CatalogTable) table; + + if (catalogTable.isPartitioned()) { + int partitionKeySize = catalogTable.getPartitionKeys().size(); + List<FieldSchema> regularColumns = allColumns.subList(0, allColumns.size() - partitionKeySize); + List<FieldSchema> partitionColumns = allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size()); + + sd.setCols(regularColumns); + hiveTable.setPartitionKeys(partitionColumns); + } else { + sd.setCols(allColumns); + hiveTable.setPartitionKeys(new ArrayList<>()); + } + + hiveTable.setSd(sd); + } else { + // TODO: [FLINK-12240] Support view related operations in GenericHiveMetastoreCatalog + throw new UnsupportedOperationException(); + } + + return hiveTable; + } + + /** + * Creates a CatalogBaseTable from a Hive table. + * TODO: [FLINK-12240] Support view related operations in GenericHiveMetastoreCatalog + * + * @param hiveTable the Hive table + * @return a CatalogBaseTable + */ + public static CatalogBaseTable createCatalogTable(Table hiveTable) { + // Table schema + TableSchema tableSchema = HiveCatalogBaseUtil.createTableSchema( + hiveTable.getSd().getCols(), hiveTable.getPartitionKeys()); + + // Table properties + Map<String, String> properties = retrieveFlinkProperties(hiveTable.getParameters()); + + // Table comment + String comment = properties.remove(HiveTableConfig.TABLE_COMMENT); + + // Partition keys + List<String> partitionKeys = new ArrayList<>(); + + if (hiveTable.getPartitionKeys() != null && hiveTable.getPartitionKeys().isEmpty()) { + partitionKeys = hiveTable.getPartitionKeys().stream() + .map(fs -> fs.getName()) + .collect(Collectors.toList()); + } + + return new GenericCatalogTable( + tableSchema, new TableStats(0), partitionKeys, properties, comment); + } + + /** + * Create Hive columns from Flink TableSchema. + */ + private static List<FieldSchema> createHiveColumns(TableSchema schema) { + String[] fieldNames = schema.getFieldNames(); + TypeInformation[] fieldTypes = schema.getFieldTypes(); + + List<FieldSchema> columns = new ArrayList<>(fieldNames.length); + + for (int i = 0; i < fieldNames.length; i++) { + columns.add( + new FieldSchema(fieldNames[i], HiveTypeUtil.toHiveType(fieldTypes[i]), null)); + } + + return columns; + } + + /** + * Filter out Hive-created properties, and return Flink-created properties. + */ + private static Map<String, String> retrieveFlinkProperties(Map<String, String> hiveTableParams) { + return hiveTableParams.entrySet().stream() + .filter(e -> e.getKey().startsWith(FLINK_PROPERTY_PREFIX)) + .collect(Collectors.toMap(e -> e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue())); + } + + /** + * Add a prefix to Flink-created properties to distinguish them from Hive-created properties. + */ + public static Map<String, String> buildFlinkProperties(Map<String, String> properties) { + return properties.entrySet().stream() + .filter(e -> e.getKey() != null && e.getValue() != null) + .collect(Collectors.toMap(e -> FLINK_PROPERTY_PREFIX + e.getKey(), e -> e.getValue())); } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBaseUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBaseUtil.java new file mode 100644 index 0000000..3f93b4a --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBaseUtil.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; + +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; + +import java.util.ArrayList; +import java.util.List; + +/** + * Shared util for catalogs backed by Hive-metastore. + */ +public class HiveCatalogBaseUtil { + + /** + * Create a Flink's TableSchema from Hive table's columns and partition keys. + * + * @param cols columns of the Hive table + * @param partitionKeys partition keys of the Hive table + * @return a Flink TableSchema + */ + public static TableSchema createTableSchema(List<FieldSchema> cols, List<FieldSchema> partitionKeys) { + List<FieldSchema> allCols = new ArrayList<>(cols); + allCols.addAll(partitionKeys); + + String[] colNames = new String[allCols.size()]; + TypeInformation[] colTypes = new TypeInformation[allCols.size()]; + + for (int i = 0; i < allCols.size(); i++) { + FieldSchema fs = allCols.get(i); + + colNames[i] = fs.getName(); + colTypes[i] = HiveTypeUtil.toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(fs.getType())); + } + + return new TableSchema(colNames, colTypes); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java similarity index 61% copy from flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java index 91cf133..336d16f 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java @@ -16,20 +16,14 @@ * limitations under the License. */ -package org.apache.flink.table.catalog.exceptions; +package org.apache.flink.table.catalog.hive; /** - * Exception for trying to drop on a database that is not empty. - * + * Configs for Flink tables stored in Hive metastore. */ -public class DatabaseNotEmptyException extends Exception { - private static final String MSG = "Database %s in Catalog %s is not empty."; +public class HiveTableConfig { - public DatabaseNotEmptyException(String catalog, String database, Throwable cause) { - super(String.format(MSG, database, catalog), cause); - } + // Description of the Flink table + public static final String TABLE_COMMENT = "comment"; - public DatabaseNotEmptyException(String catalog, String database) { - this(catalog, database, null); - } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTypeUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTypeUtil.java new file mode 100644 index 0000000..c665d6c --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTypeUtil.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +/** + * Utils to convert data types between Flink and Hive. + */ +public class HiveTypeUtil { + + // Note: Need to keep this in sync with BaseSemanticAnalyzer::getTypeStringFromAST + private static final String HIVE_ARRAY_TYPE_NAME_FORMAT = serdeConstants.LIST_TYPE_NAME + "<%s>"; + + private HiveTypeUtil() { + } + + /** + * Convert Flink data type to Hive data type. + * TODO: the following Hive types are not supported in Flink yet, including CHAR, VARCHAR, DECIMAL, MAP, STRUCT + * [FLINK-12386] Support complete mapping between Flink and Hive data types + * + * @param type a Flink data type + * @return the corresponding Hive data type + */ + public static String toHiveType(TypeInformation type) { + if (type == BasicTypeInfo.BOOLEAN_TYPE_INFO) { + return serdeConstants.BOOLEAN_TYPE_NAME; + } else if (type == BasicTypeInfo.BYTE_TYPE_INFO) { + return serdeConstants.TINYINT_TYPE_NAME; + } else if (type == BasicTypeInfo.SHORT_TYPE_INFO) { + return serdeConstants.SMALLINT_TYPE_NAME; + } else if (type == BasicTypeInfo.INT_TYPE_INFO) { + return serdeConstants.INT_TYPE_NAME; + } else if (type == BasicTypeInfo.LONG_TYPE_INFO) { + return serdeConstants.BIGINT_TYPE_NAME; + } else if (type == BasicTypeInfo.FLOAT_TYPE_INFO) { + return serdeConstants.FLOAT_TYPE_NAME; + } else if (type == BasicTypeInfo.DOUBLE_TYPE_INFO) { + return serdeConstants.DOUBLE_TYPE_NAME; + } else if (type == BasicTypeInfo.STRING_TYPE_INFO) { + return serdeConstants.STRING_TYPE_NAME; + } else if (type == BasicTypeInfo.DATE_TYPE_INFO) { + return serdeConstants.DATE_TYPE_NAME; + } else if (type == BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO) { + return serdeConstants.BINARY_TYPE_NAME; + } else if (type instanceof SqlTimeTypeInfo) { + return serdeConstants.TIMESTAMP_TYPE_NAME; + } else if (type instanceof BasicArrayTypeInfo) { + return toHiveArrayType((BasicArrayTypeInfo) type); + } else { + throw new UnsupportedOperationException( + String.format("Flink doesn't support converting type %s to Hive type yet.", type.toString())); + } + } + + private static String toHiveArrayType(BasicArrayTypeInfo arrayTypeInfo) { + return String.format(HIVE_ARRAY_TYPE_NAME_FORMAT, toHiveType(arrayTypeInfo.getComponentInfo())); + } + + /** + * Convert Hive data type to a Flink data type. + * TODO: the following Hive types are not supported in Flink yet, including CHAR, VARCHAR, DECIMAL, MAP, STRUCT + * [FLINK-12386] Support complete mapping between Flink and Hive data types + * + * @param hiveType a Hive data type + * @return the corresponding Flink data type + */ + public static TypeInformation toFlinkType(TypeInfo hiveType) { + switch (hiveType.getCategory()) { + case PRIMITIVE: + return toFlinkPrimitiveType((PrimitiveTypeInfo) hiveType); + case LIST: + ListTypeInfo listTypeInfo = (ListTypeInfo) hiveType; + return BasicArrayTypeInfo.getInfoFor(toFlinkType(listTypeInfo.getListElementTypeInfo()).getTypeClass()); + default: + throw new UnsupportedOperationException( + String.format("Flink doesn't support Hive data type %s yet.", hiveType)); + } + } + + // TODO: the following Hive types are not supported in Flink yet, including CHAR, VARCHAR, DECIMAL, MAP, STRUCT + // [FLINK-12386] Support complete mapping between Flink and Hive data types + private static TypeInformation toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) { + switch (hiveType.getPrimitiveCategory()) { + // For CHAR(p) and VARCHAR(p) types, map them to String for now because Flink doesn't yet support them. + case CHAR: + case VARCHAR: + case STRING: + return BasicTypeInfo.STRING_TYPE_INFO; + case BOOLEAN: + return BasicTypeInfo.BOOLEAN_TYPE_INFO; + case BYTE: + return BasicTypeInfo.BYTE_TYPE_INFO; + case SHORT: + return BasicTypeInfo.SHORT_TYPE_INFO; + case INT: + return BasicTypeInfo.INT_TYPE_INFO; + case LONG: + return BasicTypeInfo.LONG_TYPE_INFO; + case FLOAT: + return BasicTypeInfo.FLOAT_TYPE_INFO; + case DOUBLE: + return BasicTypeInfo.DOUBLE_TYPE_INFO; + case DATE: + return BasicTypeInfo.DATE_TYPE_INFO; + case TIMESTAMP: + return SqlTimeTypeInfo.TIMESTAMP; + case BINARY: + return BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO; + default: + throw new UnsupportedOperationException( + String.format("Flink doesn't support Hive primitive type %s yet", hiveType)); + } + } +} diff --git a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/NOTICE b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/NOTICE index ea1c160..b67affe 100644 --- a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/NOTICE +++ b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/NOTICE @@ -8,6 +8,9 @@ This project bundles the following dependencies under the Apache Software Licens - commons-dbcp:commons-dbcp:1.4 - commons-pool:commons-pool:1.5.4 +- com.fasterxml.jackson.core:jackson-annotations:2.6.0 +- com.fasterxml.jackson.core:jackson-core:2.6.5 +- com.fasterxml.jackson.core:jackson-databind:2.6.5 - com.jolbox:bonecp:0.8.0.RELEASE - org.apache.hive:hive-common:2.3.4 - org.apache.hive:hive-metastore:2.3.4 diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java index 315d657..2687699 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java @@ -18,10 +18,18 @@ package org.apache.flink.table.catalog.hive; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogDatabase; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTestBase; +import org.apache.flink.table.catalog.CatalogTestUtil; import org.apache.flink.table.catalog.GenericCatalogDatabase; +import org.apache.flink.table.catalog.GenericCatalogTable; +import org.apache.flink.table.plan.stats.TableStats; import org.junit.BeforeClass; import org.junit.Test; @@ -40,14 +48,47 @@ public class GenericHiveMetastoreCatalogTest extends CatalogTestBase { catalog.open(); } - // ===================== - // GenericHiveMetastoreCatalog doesn't support table operation yet - // Thus, overriding the following tests which involve table operation in CatalogTestBase so they won't run against GenericHiveMetastoreCatalog - // ===================== + // ------ data types ------ - // TODO: re-enable this test once GenericHiveMetastoreCatalog support table operations @Test - public void testDropDb_DatabaseNotEmptyException() throws Exception { + public void testDataTypes() throws Exception { + // TODO: the following Hive types are not supported in Flink yet, including CHAR, VARCHAR, DECIMAL, MAP, STRUCT + // [FLINK-12386] Support complete mapping between Flink and Hive data types + TypeInformation[] types = new TypeInformation[] { + BasicTypeInfo.BYTE_TYPE_INFO, + BasicTypeInfo.SHORT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.FLOAT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.BOOLEAN_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO, + BasicTypeInfo.DATE_TYPE_INFO, + SqlTimeTypeInfo.TIMESTAMP + }; + + verifyDataTypes(types); + } + + private void verifyDataTypes(TypeInformation[] types) throws Exception { + String[] colNames = new String[types.length]; + + for (int i = 0; i < types.length; i++) { + colNames[i] = types[i].toString().toLowerCase() + "_col"; + } + + CatalogTable table = new GenericCatalogTable( + new TableSchema(colNames, types), + new TableStats(0), + getBatchTableProperties(), + TEST_COMMENT + ); + + catalog.createDatabase(db1, createDb(), false); + catalog.createTable(path1, table, false); + + CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1)); } // ------ utils ------ @@ -77,13 +118,48 @@ public class GenericHiveMetastoreCatalogTest extends CatalogTestBase { @Override public CatalogTable createTable() { - // TODO: implement this once GenericHiveMetastoreCatalog support table operations - return null; + return new GenericCatalogTable( + createTableSchema(), + new TableStats(0), + getBatchTableProperties(), + TEST_COMMENT); } @Override public CatalogTable createAnotherTable() { - // TODO: implement this once GenericHiveMetastoreCatalog support table operations - return null; + return new GenericCatalogTable( + createAnotherTableSchema(), + new TableStats(0), + getBatchTableProperties(), + TEST_COMMENT); + } + + @Override + public CatalogTable createStreamingTable() { + return new GenericCatalogTable( + createTableSchema(), + new TableStats(0), + getStreamingTableProperties(), + TEST_COMMENT); + } + + @Override + public CatalogTable createPartitionedTable() { + return new GenericCatalogTable( + createTableSchema(), + new TableStats(0), + createPartitionKeys(), + getBatchTableProperties(), + TEST_COMMENT); + } + + @Override + public CatalogTable createAnotherPartitionedTable() { + return new GenericCatalogTable( + createAnotherTableSchema(), + new TableStats(0), + createPartitionKeys(), + getBatchTableProperties(), + TEST_COMMENT); } } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java index 83f5bed..4a32313 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java @@ -18,6 +18,8 @@ package org.apache.flink.table.catalog.hive; +import org.apache.flink.table.catalog.CatalogTestBase; + import org.apache.hadoop.hive.conf.HiveConf; import org.junit.rules.TemporaryFolder; @@ -35,7 +37,7 @@ public class HiveTestUtils { * Create a GenericHiveMetastoreCatalog with an embedded Hive Metastore. */ public static GenericHiveMetastoreCatalog createGenericHiveMetastoreCatalog() throws IOException { - return new GenericHiveMetastoreCatalog("test", getHiveConf()); + return new GenericHiveMetastoreCatalog(CatalogTestBase.TEST_CATALOG_NAME, getHiveConf()); } private static HiveConf getHiveConf() throws IOException { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java index 47498e7..73c2dbc 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java @@ -61,8 +61,8 @@ public class GenericCatalogTable implements CatalogTable { TableSchema tableSchema, TableStats tableStats, Map<String, String> properties, - String comment) { - this(tableSchema, tableStats, new ArrayList<>(), properties, comment); + String description) { + this(tableSchema, tableStats, new ArrayList<>(), properties, description); } @Override @@ -91,6 +91,11 @@ public class GenericCatalogTable implements CatalogTable { } @Override + public String getComment() { + return comment; + } + + @Override public GenericCatalogTable copy() { return new GenericCatalogTable( this.tableSchema.copy(), this.tableStats.copy(), new ArrayList<>(partitionKeys), new HashMap<>(this.properties), comment); @@ -106,12 +111,4 @@ public class GenericCatalogTable implements CatalogTable { return Optional.of("This is a catalog table in an im-memory catalog"); } - public String getComment() { - return this.comment; - } - - public void setComment(String comment) { - this.comment = comment; - } - } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java index f88db0d..40a0d9f 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java @@ -18,9 +18,6 @@ package org.apache.flink.table.catalog; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; @@ -40,7 +37,6 @@ import org.junit.Test; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -62,15 +58,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase { @After public void close() throws Exception { - if (catalog.tableExists(path1)) { - catalog.dropTable(path1, true); - } - if (catalog.tableExists(path2)) { - catalog.dropTable(path2, true); - } - if (catalog.tableExists(path3)) { - catalog.dropTable(path3, true); - } if (catalog.functionExists(path1)) { catalog.dropFunction(path1, true); } @@ -79,108 +66,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase { // ------ tables ------ @Test - public void testCreateTable_Streaming() throws Exception { - catalog.createDatabase(db1, createDb(), false); - CatalogTable table = createStreamingTable(); - catalog.createTable(path1, table, false); - - CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1)); - } - - @Test - public void testCreateTable_Batch() throws Exception { - catalog.createDatabase(db1, createDb(), false); - - // Non-partitioned table - CatalogTable table = createTable(); - catalog.createTable(path1, table, false); - - CatalogBaseTable tableCreated = catalog.getTable(path1); - - CatalogTestUtil.checkEquals(table, (CatalogTable) tableCreated); - assertEquals(TABLE_COMMENT, tableCreated.getDescription().get()); - - List<String> tables = catalog.listTables(db1); - - assertEquals(1, tables.size()); - assertEquals(path1.getObjectName(), tables.get(0)); - - catalog.dropTable(path1, false); - - // Partitioned table - table = createPartitionedTable(); - catalog.createTable(path1, table, false); - - CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1)); - - tables = catalog.listTables(db1); - - assertEquals(1, tables.size()); - assertEquals(path1.getObjectName(), tables.get(0)); - } - - @Test - public void testCreateTable_DatabaseNotExistException() throws Exception { - assertFalse(catalog.databaseExists(db1)); - - exception.expect(DatabaseNotExistException.class); - exception.expectMessage("Database db1 does not exist in Catalog"); - catalog.createTable(nonExistObjectPath, createTable(), false); - } - - @Test - public void testCreateTable_TableAlreadyExistException() throws Exception { - catalog.createDatabase(db1, createDb(), false); - catalog.createTable(path1, createTable(), false); - - exception.expect(TableAlreadyExistException.class); - exception.expectMessage("Table (or view) db1.t1 already exists in Catalog"); - catalog.createTable(path1, createTable(), false); - } - - @Test - public void testCreateTable_TableAlreadyExist_ignored() throws Exception { - catalog.createDatabase(db1, createDb(), false); - - CatalogTable table = createTable(); - catalog.createTable(path1, table, false); - - CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1)); - - catalog.createTable(path1, createAnotherTable(), true); - - CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1)); - } - - @Test - public void testGetTable_TableNotExistException() throws Exception { - catalog.createDatabase(db1, createDb(), false); - - exception.expect(TableNotExistException.class); - exception.expectMessage("Table (or view) db1.nonexist does not exist in Catalog"); - catalog.getTable(nonExistObjectPath); - } - - @Test - public void testGetTable_TableNotExistException_NoDb() throws Exception { - exception.expect(TableNotExistException.class); - exception.expectMessage("Table (or view) db1.nonexist does not exist in Catalog"); - catalog.getTable(nonExistObjectPath); - } - - @Test - public void testDropTable_nonPartitionedTable() throws Exception { - catalog.createDatabase(db1, createDb(), false); - catalog.createTable(path1, createTable(), false); - - assertTrue(catalog.tableExists(path1)); - - catalog.dropTable(path1, false); - - assertFalse(catalog.tableExists(path1)); - } - - @Test public void testDropTable_partitionedTable() throws Exception { catalog.createDatabase(db1, createDb(), false); catalog.createTable(path1, createPartitionedTable(), false); @@ -214,78 +99,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase { } @Test - public void testDropTable_TableNotExistException() throws Exception { - exception.expect(TableNotExistException.class); - exception.expectMessage("Table (or view) non.exist does not exist in Catalog"); - catalog.dropTable(nonExistDbPath, false); - } - - @Test - public void testDropTable_TableNotExist_ignored() throws Exception { - catalog.createDatabase(db1, createDb(), false); - catalog.dropTable(nonExistObjectPath, true); - } - - @Test - public void testAlterTable() throws Exception { - catalog.createDatabase(db1, createDb(), false); - - // Non-partitioned table - CatalogTable table = createTable(); - catalog.createTable(path1, table, false); - - CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1)); - - CatalogTable newTable = createAnotherTable(); - catalog.alterTable(path1, newTable, false); - - assertNotEquals(table, catalog.getTable(path1)); - CatalogTestUtil.checkEquals(newTable, (CatalogTable) catalog.getTable(path1)); - - catalog.dropTable(path1, false); - - // Partitioned table - table = createPartitionedTable(); - catalog.createTable(path1, table, false); - - CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1)); - - newTable = createAnotherPartitionedTable(); - catalog.alterTable(path1, newTable, false); - - CatalogTestUtil.checkEquals(newTable, (CatalogTable) catalog.getTable(path1)); - } - - @Test - public void testAlterTable_TableNotExistException() throws Exception { - exception.expect(TableNotExistException.class); - exception.expectMessage("Table (or view) non.exist does not exist in Catalog"); - catalog.alterTable(nonExistDbPath, createTable(), false); - } - - @Test - public void testAlterTable_TableNotExist_ignored() throws Exception { - catalog.createDatabase(db1, createDb(), false); - catalog.alterTable(nonExistObjectPath, createTable(), true); - - assertFalse(catalog.tableExists(nonExistObjectPath)); - } - - @Test - public void testRenameTable_nonPartitionedTable() throws Exception { - catalog.createDatabase(db1, createDb(), false); - CatalogTable table = createTable(); - catalog.createTable(path1, table, false); - - CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1)); - - catalog.renameTable(path1, t2, false); - - CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path3)); - assertFalse(catalog.tableExists(path1)); - } - - @Test public void testRenameTable_partitionedTable() throws Exception { catalog.createDatabase(db1, createDb(), false); CatalogTable table = createPartitionedTable(); @@ -305,44 +118,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase { assertFalse(catalog.partitionExists(path1, catalogPartitionSpec)); } - @Test - public void testRenameTable_TableNotExistException() throws Exception { - catalog.createDatabase(db1, createDb(), false); - - exception.expect(TableNotExistException.class); - exception.expectMessage("Table (or view) db1.t1 does not exist in Catalog"); - catalog.renameTable(path1, t2, false); - } - - @Test - public void testRenameTable_TableNotExistException_ignored() throws Exception { - catalog.createDatabase(db1, createDb(), false); - catalog.renameTable(path1, t2, true); - } - - @Test - public void testRenameTable_TableAlreadyExistException() throws Exception { - catalog.createDatabase(db1, createDb(), false); - CatalogTable table = createTable(); - catalog.createTable(path1, table, false); - catalog.createTable(path3, createAnotherTable(), false); - - exception.expect(TableAlreadyExistException.class); - exception.expectMessage("Table (or view) db1.t2 already exists in Catalog"); - catalog.renameTable(path1, t2, false); - } - - @Test - public void testTableExists() throws Exception { - catalog.createDatabase(db1, createDb(), false); - - assertFalse(catalog.tableExists(path1)); - - catalog.createTable(path1, createTable(), false); - - assertTrue(catalog.tableExists(path1)); - } - // ------ views ------ @Test @@ -951,24 +726,29 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase { @Override public CatalogDatabase createDb() { - return new GenericCatalogDatabase(new HashMap<String, String>() {{ - put("k1", "v1"); - }}, TEST_COMMENT); + return new GenericCatalogDatabase( + new HashMap<String, String>() {{ + put("k1", "v1"); + }}, + TEST_COMMENT); } @Override public CatalogDatabase createAnotherDb() { - return new GenericCatalogDatabase(new HashMap<String, String>() {{ - put("k2", "v2"); - }}, "this is another database."); + return new GenericCatalogDatabase( + new HashMap<String, String>() {{ + put("k2", "v2"); + }}, + "this is another database."); } - private GenericCatalogTable createStreamingTable() { + @Override + public GenericCatalogTable createStreamingTable() { return new GenericCatalogTable( createTableSchema(), new TableStats(0), getStreamingTableProperties(), - TABLE_COMMENT); + TEST_COMMENT); } @Override @@ -977,7 +757,7 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase { createTableSchema(), new TableStats(0), getBatchTableProperties(), - TABLE_COMMENT); + TEST_COMMENT); } @Override @@ -986,29 +766,27 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase { createAnotherTableSchema(), new TableStats(0), getBatchTableProperties(), - TABLE_COMMENT); + TEST_COMMENT); } - protected CatalogTable createPartitionedTable() { + @Override + public CatalogTable createPartitionedTable() { return new GenericCatalogTable( createTableSchema(), new TableStats(0), createPartitionKeys(), getBatchTableProperties(), - TABLE_COMMENT); + TEST_COMMENT); } - protected CatalogTable createAnotherPartitionedTable() { + @Override + public CatalogTable createAnotherPartitionedTable() { return new GenericCatalogTable( createAnotherTableSchema(), new TableStats(0), createPartitionKeys(), getBatchTableProperties(), - TABLE_COMMENT); - } - - private List<String> createPartitionKeys() { - return Arrays.asList("second", "third"); + TEST_COMMENT); } private CatalogPartitionSpec createPartitionSpec() { @@ -1053,40 +831,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase { return new GenericCatalogPartition(props); } - private Map<String, String> getBatchTableProperties() { - return new HashMap<String, String>() {{ - put(IS_STREAMING, "false"); - }}; - } - - private Map<String, String> getStreamingTableProperties() { - return new HashMap<String, String>() {{ - put(IS_STREAMING, "true"); - }}; - } - - private TableSchema createTableSchema() { - return new TableSchema( - new String[] {"first", "second", "third"}, - new TypeInformation[] { - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - } - ); - } - - private TableSchema createAnotherTableSchema() { - return new TableSchema( - new String[] {"first2", "second", "third"}, - new TypeInformation[] { - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO - } - ); - } - private CatalogView createView() { return new GenericCatalogView( String.format("select * from %s", t1), diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java index f916354..e2157e0 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java @@ -30,18 +30,28 @@ import java.util.Optional; public interface CatalogBaseTable { /** * Get the properties of the table. - * @return table property map + * + * @return property map of the table/view */ Map<String, String> getProperties(); /** * Get the schema of the table. - * @return schema of the table + * + * @return schema of the table/view. */ TableSchema getSchema(); /** + * Get comment of the table or view. + * + * @return comment of the table/view. + */ + String getComment(); + + /** * Get a deep copy of the CatalogBaseTable instance. + * * @return a copy of the CatalogBaseTable instance */ CatalogBaseTable copy(); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java index a8a69f2..5586348 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java @@ -96,6 +96,8 @@ public interface ReadableCatalog { */ boolean databaseExists(String databaseName) throws CatalogException; + // ------ tables and views ------ + /** * Get names of all tables and views under this database. An empty list is returned if none exists. * @@ -116,24 +118,24 @@ public interface ReadableCatalog { List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException; /** - * Get a CatalogTable or CatalogView identified by objectPath. + * Get a CatalogTable or CatalogView identified by tablePath. * - * @param objectPath Path of the table or view + * @param tablePath Path of the table or view * @return The requested table or view * @throws TableNotExistException if the target does not exist * @throws CatalogException in case of any runtime exception */ - CatalogBaseTable getTable(ObjectPath objectPath) throws TableNotExistException, CatalogException; + CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException; /** * Check if a table or view exists in this catalog. * - * @param objectPath Path of the table or view + * @param tablePath Path of the table or view * @return true if the given table exists in the catalog * false otherwise * @throws CatalogException in case of any runtime exception */ - boolean tableExists(ObjectPath objectPath) throws CatalogException; + boolean tableExists(ObjectPath tablePath) throws CatalogException; // ------ partitions ------ diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java index 04d9bcb..60bc93d 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java @@ -102,11 +102,10 @@ public interface ReadableWritableCatalog extends ReadableCatalog { * if set to false, throw an exception, * if set to true, do nothing. * @throws TableNotExistException if the table does not exist - * @throws DatabaseNotExistException if the database in tablePath to doesn't exist * @throws CatalogException in case of any runtime exception */ void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) - throws TableNotExistException, TableAlreadyExistException, DatabaseNotExistException, CatalogException; + throws TableNotExistException, TableAlreadyExistException, CatalogException; /** * Create a new table or view. @@ -128,7 +127,7 @@ public interface ReadableWritableCatalog extends ReadableCatalog { * Note that the new and old CatalogBaseTable must be of the same type. For example, this doesn't * allow alter a regular table to partitioned table, or alter a view to a table, and vice versa. * - * @param tableName path of the table or view to be modified + * @param tablePath path of the table or view to be modified * @param newTable the new table definition * @param ignoreIfNotExists flag to specify behavior when the table or view does not exist: * if set to false, throw an exception, @@ -136,7 +135,7 @@ public interface ReadableWritableCatalog extends ReadableCatalog { * @throws TableNotExistException if the table does not exist * @throws CatalogException in case of any runtime exception */ - void alterTable(ObjectPath tableName, CatalogBaseTable newTable, boolean ignoreIfNotExists) + void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException; // ------ partitions ------ diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java index 91cf133..423d141 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java @@ -23,7 +23,7 @@ package org.apache.flink.table.catalog.exceptions; * */ public class DatabaseNotEmptyException extends Exception { - private static final String MSG = "Database %s in Catalog %s is not empty."; + private static final String MSG = "Database %s in catalog %s is not empty."; public DatabaseNotEmptyException(String catalog, String database, Throwable cause) { super(String.format(MSG, database, catalog), cause); diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java index 5457d34..8117c82 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java @@ -18,9 +18,14 @@ package org.apache.flink.table.catalog; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.junit.After; import org.junit.AfterClass; @@ -29,11 +34,14 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; /** @@ -55,9 +63,9 @@ public abstract class CatalogTestBase { protected final ObjectPath nonExistDbPath = ObjectPath.fromString("non.exist"); protected final ObjectPath nonExistObjectPath = ObjectPath.fromString("db1.nonexist"); - protected static final String TEST_CATALOG_NAME = "test-catalog"; + public static final String TEST_CATALOG_NAME = "test-catalog"; + protected static final String TEST_COMMENT = "test comment"; - protected static final String TABLE_COMMENT = "This is my batch table"; protected static ReadableWritableCatalog catalog; @@ -66,6 +74,16 @@ public abstract class CatalogTestBase { @After public void cleanup() throws Exception { + if (catalog.tableExists(path1)) { + catalog.dropTable(path1, true); + } + if (catalog.tableExists(path2)) { + catalog.dropTable(path2, true); + } + if (catalog.tableExists(path3)) { + catalog.dropTable(path3, true); + } + if (catalog.databaseExists(db1)) { catalog.dropDatabase(db1, true); } @@ -168,7 +186,7 @@ public abstract class CatalogTestBase { catalog.createTable(path1, createTable(), false); exception.expect(DatabaseNotEmptyException.class); - exception.expectMessage("Database db1 in Catalog test-catalog is not empty"); + exception.expectMessage("Database db1 in catalog test-catalog is not empty"); catalog.dropDatabase(db1, true); } @@ -209,6 +227,220 @@ public abstract class CatalogTestBase { assertTrue(catalog.databaseExists(db1)); } + // ------ tables ------ + + @Test + public void testCreateTable_Streaming() throws Exception { + catalog.createDatabase(db1, createDb(), false); + CatalogTable table = createStreamingTable(); + catalog.createTable(path1, table, false); + + CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1)); + } + + @Test + public void testCreateTable_Batch() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + // Non-partitioned table + CatalogTable table = createTable(); + catalog.createTable(path1, table, false); + + CatalogBaseTable tableCreated = catalog.getTable(path1); + + CatalogTestUtil.checkEquals(table, (CatalogTable) tableCreated); + assertEquals(TEST_COMMENT, tableCreated.getDescription().get()); + + List<String> tables = catalog.listTables(db1); + + assertEquals(1, tables.size()); + assertEquals(path1.getObjectName(), tables.get(0)); + + catalog.dropTable(path1, false); + + // Partitioned table + table = createPartitionedTable(); + catalog.createTable(path1, table, false); + + CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1)); + + tables = catalog.listTables(db1); + + assertEquals(1, tables.size()); + assertEquals(path1.getObjectName(), tables.get(0)); + } + + @Test + public void testCreateTable_DatabaseNotExistException() throws Exception { + assertFalse(catalog.databaseExists(db1)); + + exception.expect(DatabaseNotExistException.class); + exception.expectMessage("Database db1 does not exist in Catalog"); + catalog.createTable(nonExistObjectPath, createTable(), false); + } + + @Test + public void testCreateTable_TableAlreadyExistException() throws Exception { + catalog.createDatabase(db1, createDb(), false); + catalog.createTable(path1, createTable(), false); + + exception.expect(TableAlreadyExistException.class); + exception.expectMessage("Table (or view) db1.t1 already exists in Catalog"); + catalog.createTable(path1, createTable(), false); + } + + @Test + public void testCreateTable_TableAlreadyExist_ignored() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + CatalogTable table = createTable(); + catalog.createTable(path1, table, false); + + CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1)); + + catalog.createTable(path1, createAnotherTable(), true); + + CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1)); + } + + @Test + public void testGetTable_TableNotExistException() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + exception.expect(TableNotExistException.class); + exception.expectMessage("Table (or view) db1.nonexist does not exist in Catalog"); + catalog.getTable(nonExistObjectPath); + } + + @Test + public void testGetTable_TableNotExistException_NoDb() throws Exception { + exception.expect(TableNotExistException.class); + exception.expectMessage("Table (or view) db1.nonexist does not exist in Catalog"); + catalog.getTable(nonExistObjectPath); + } + + @Test + public void testDropTable_nonPartitionedTable() throws Exception { + catalog.createDatabase(db1, createDb(), false); + catalog.createTable(path1, createTable(), false); + + assertTrue(catalog.tableExists(path1)); + + catalog.dropTable(path1, false); + + assertFalse(catalog.tableExists(path1)); + } + + @Test + public void testDropTable_TableNotExistException() throws Exception { + exception.expect(TableNotExistException.class); + exception.expectMessage("Table (or view) non.exist does not exist in Catalog"); + catalog.dropTable(nonExistDbPath, false); + } + + @Test + public void testDropTable_TableNotExist_ignored() throws Exception { + catalog.createDatabase(db1, createDb(), false); + catalog.dropTable(nonExistObjectPath, true); + } + + @Test + public void testAlterTable() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + // Non-partitioned table + CatalogTable table = createTable(); + catalog.createTable(path1, table, false); + + CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1)); + + CatalogTable newTable = createAnotherTable(); + catalog.alterTable(path1, newTable, false); + + assertNotEquals(table, catalog.getTable(path1)); + CatalogTestUtil.checkEquals(newTable, (CatalogTable) catalog.getTable(path1)); + + catalog.dropTable(path1, false); + + // Partitioned table + table = createPartitionedTable(); + catalog.createTable(path1, table, false); + + CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1)); + + newTable = createAnotherPartitionedTable(); + catalog.alterTable(path1, newTable, false); + + CatalogTestUtil.checkEquals(newTable, (CatalogTable) catalog.getTable(path1)); + } + + @Test + public void testAlterTable_TableNotExistException() throws Exception { + exception.expect(TableNotExistException.class); + exception.expectMessage("Table (or view) non.exist does not exist in Catalog"); + catalog.alterTable(nonExistDbPath, createTable(), false); + } + + @Test + public void testAlterTable_TableNotExist_ignored() throws Exception { + catalog.createDatabase(db1, createDb(), false); + catalog.alterTable(nonExistObjectPath, createTable(), true); + + assertFalse(catalog.tableExists(nonExistObjectPath)); + } + + @Test + public void testRenameTable_nonPartitionedTable() throws Exception { + catalog.createDatabase(db1, createDb(), false); + CatalogTable table = createTable(); + catalog.createTable(path1, table, false); + + CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1)); + + catalog.renameTable(path1, t2, false); + + CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path3)); + assertFalse(catalog.tableExists(path1)); + } + + @Test + public void testRenameTable_TableNotExistException() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + exception.expect(TableNotExistException.class); + exception.expectMessage("Table (or view) db1.t1 does not exist in Catalog"); + catalog.renameTable(path1, t2, false); + } + + @Test + public void testRenameTable_TableNotExistException_ignored() throws Exception { + catalog.createDatabase(db1, createDb(), false); + catalog.renameTable(path1, t2, true); + } + + @Test + public void testRenameTable_TableAlreadyExistException() throws Exception { + catalog.createDatabase(db1, createDb(), false); + CatalogTable table = createTable(); + catalog.createTable(path1, table, false); + catalog.createTable(path3, createAnotherTable(), false); + + exception.expect(TableAlreadyExistException.class); + exception.expectMessage("Table (or view) db1.t2 already exists in Catalog"); + catalog.renameTable(path1, t2, false); + } + + @Test + public void testTableExists() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + assertFalse(catalog.tableExists(path1)); + + catalog.createTable(path1, createTable(), false); + + assertTrue(catalog.tableExists(path1)); + } + // ------ utilities ------ /** @@ -245,4 +477,63 @@ public abstract class CatalogTestBase { * @return another CatalogTable instance */ public abstract CatalogTable createAnotherTable(); + + /** + * Create a streaming CatalogTable instance by specific catalog implementation. + * + * @return a streaming CatalogTable instance + */ + public abstract CatalogTable createStreamingTable(); + + /** + * Create a partitioned CatalogTable instance by specific catalog implementation. + * + * @return a streaming CatalogTable instance + */ + public abstract CatalogTable createPartitionedTable(); + + /** + * Create another partitioned CatalogTable instance by specific catalog implementation. + * + * @return another partitioned CatalogTable instance + */ + public abstract CatalogTable createAnotherPartitionedTable(); + + protected TableSchema createTableSchema() { + return new TableSchema( + new String[] {"first", "second", "third"}, + new TypeInformation[] { + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + } + ); + } + + protected TableSchema createAnotherTableSchema() { + return new TableSchema( + new String[] {"first2", "second", "third"}, + new TypeInformation[] { + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO + } + ); + } + + protected List<String> createPartitionKeys() { + return Arrays.asList("second", "third"); + } + + protected Map<String, String> getBatchTableProperties() { + return new HashMap<String, String>() {{ + put(IS_STREAMING, "false"); + }}; + } + + protected Map<String, String> getStreamingTableProperties() { + return new HashMap<String, String>() {{ + put(IS_STREAMING, "true"); + }}; + } } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java index 0700736..57341fe 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java @@ -33,26 +33,26 @@ public class CatalogTestUtil { assertEquals(t1.getDescription(), t2.getDescription()); } - protected static void checkEquals(TableStats ts1, TableStats ts2) { + public static void checkEquals(TableStats ts1, TableStats ts2) { assertEquals(ts1.getRowCount(), ts2.getRowCount()); assertEquals(ts1.getColumnStats().size(), ts2.getColumnStats().size()); } - protected static void checkEquals(CatalogView v1, CatalogView v2) { + public static void checkEquals(CatalogView v1, CatalogView v2) { assertEquals(v1.getOriginalQuery(), v2.getOriginalQuery()); assertEquals(v1.getExpandedQuery(), v2.getExpandedQuery()); } - protected static void checkEquals(CatalogDatabase d1, CatalogDatabase d2) { + public static void checkEquals(CatalogDatabase d1, CatalogDatabase d2) { assertEquals(d1.getProperties(), d2.getProperties()); } - protected static void checkEquals(CatalogFunction f1, CatalogFunction f2) { + public static void checkEquals(CatalogFunction f1, CatalogFunction f2) { assertEquals(f1.getClassName(), f2.getClassName()); assertEquals(f1.getProperties(), f2.getProperties()); } - protected static void checkEquals(CatalogPartition p1, CatalogPartition p2) { + public static void checkEquals(CatalogPartition p1, CatalogPartition p2) { assertEquals(p1.getProperties(), p2.getProperties()); } }