This is an automated email from the ASF dual-hosted git repository. ngangam pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
commit d65307d795ad6415de1e8714859fa0ce4bd07a87 Author: Naveen Gangam <ngan...@cloudera.com> AuthorDate: Thu Nov 12 00:12:55 2020 -0500 Implemented getTable and getTableNames for MYSQL (working) --- .../database/create/CreateDatabaseAnalyzer.java | 10 +- .../url/AlterDataConnectorSetUrlAnalyzer.java | 4 +- .../create/CreateDataConnectorAnalyzer.java | 2 +- .../apache/hadoop/hive/metastore/IHMSHandler.java | 12 ++ .../AbstractDataConnectorProvider.java | 7 +- .../DataConnectorProviderFactory.java | 39 ++++-- .../dataconnector/IDataConnectorProvider.java | 6 + .../dataconnector/jdbc/JDBCConnectorProvider.java | 150 ++++++++++++++++++++- 8 files changed, 198 insertions(+), 32 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/create/CreateDatabaseAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/create/CreateDatabaseAnalyzer.java index 42d7c79..d342db0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/create/CreateDatabaseAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/create/CreateDatabaseAnalyzer.java @@ -75,12 +75,11 @@ public class CreateDatabaseAnalyzer extends BaseSemanticAnalyzer { break; case HiveParser.TOK_DATACONNECTOR: type = "REMOTE"; - locationUri = "REMOTE_DATABASE"; // TODO - // i++; + // locationUri = "REMOTE_DATABASE"; // TODO ASTNode nextNode = (ASTNode) root.getChild(i); connectorName = ((ASTNode)nextNode).getChild(0).getText(); outputs.add(toWriteEntity(connectorName)); - outputs.remove(toWriteEntity(locationUri)); + // outputs.remove(toWriteEntity(locationUri)); if (managedLocationUri != null) { outputs.remove(toWriteEntity(managedLocationUri)); managedLocationUri = null; @@ -91,7 +90,6 @@ public class CreateDatabaseAnalyzer extends BaseSemanticAnalyzer { } } - // String remoteDbName = props.get("connector.remoteDbName"); CreateDatabaseDesc desc = null; Database database = new Database(databaseName, comment, locationUri, props); if (type.equalsIgnoreCase("NATIVE")) { @@ -103,7 +101,7 @@ public class CreateDatabaseAnalyzer extends BaseSemanticAnalyzer { } } else { String remoteDbName = databaseName; - if (props != null && props.get("connector.remoteDbName") != null) // TODO + if (props != null && props.get("connector.remoteDbName") != null) // TODO finalize the property name remoteDbName = props.get("connector.remoteDbName"); desc = new CreateDatabaseDesc(databaseName, comment, locationUri, null, ifNotExists, props, type, connectorName, remoteDbName); @@ -112,8 +110,6 @@ public class CreateDatabaseAnalyzer extends BaseSemanticAnalyzer { database.setRemote_dbname(remoteDbName); } rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc))); - - // database = new Database(databaseName, comment, locationUri, props); outputs.add(new WriteEntity(database, WriteEntity.WriteType.DDL_NO_LOCK)); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/dataconnector/alter/url/AlterDataConnectorSetUrlAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/dataconnector/alter/url/AlterDataConnectorSetUrlAnalyzer.java index 217f702..c7b0af8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/dataconnector/alter/url/AlterDataConnectorSetUrlAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/dataconnector/alter/url/AlterDataConnectorSetUrlAnalyzer.java @@ -38,9 +38,7 @@ public class AlterDataConnectorSetUrlAnalyzer extends AbstractAlterDataConnector public void analyzeInternal(ASTNode root) throws SemanticException { String connectorName = getUnescapedName((ASTNode) root.getChild(0)); String newURL = unescapeSQLString(root.getChild(1).getText()); - - outputs.add(toWriteEntity(newURL)); - + // TODO add some validation for the URL? AlterDataConnectorSetUrlDesc desc = new AlterDataConnectorSetUrlDesc(connectorName, newURL); addAlterDataConnectorDesc(desc); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/dataconnector/create/CreateDataConnectorAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/dataconnector/create/CreateDataConnectorAnalyzer.java index 5867108..a277baf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/dataconnector/create/CreateDataConnectorAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/dataconnector/create/CreateDataConnectorAnalyzer.java @@ -63,7 +63,7 @@ public class CreateDataConnectorAnalyzer extends BaseSemanticAnalyzer { break; case HiveParser.TOK_DATACONNECTORURL: url = unescapeSQLString(childNode.getChild(0).getText()); - outputs.add(toWriteEntity(url)); + // outputs.add(toWriteEntity(url)); break; case HiveParser.TOK_DATACONNECTORTYPE: type = unescapeSQLString(childNode.getChild(0).getText()); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/IHMSHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/IHMSHandler.java index a969d5e..fbd8f87 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/IHMSHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/IHMSHandler.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.DataConnector; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Table; @@ -105,4 +106,15 @@ public interface IHMSHandler extends ThriftHiveMetastore.Iface, Configurable { * @return list of non-transactional listeners. */ List<MetaStoreEventListener> getListeners(); + + /** + * Equivalent to get_connector, but does not write to audit logs, or fire pre-event listeners. + * Meant to be used for internal hive classes that don't use the thrift interface. + * @param name connector name + * @return DataConnector object + * @throws NoSuchObjectException If the connector does not exist. + * @throws MetaException If another error occurs. + */ + DataConnector get_dataconnector_core(final String name) + throws NoSuchObjectException, MetaException; } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/dataconnector/AbstractDataConnectorProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/dataconnector/AbstractDataConnectorProvider.java index bad1ecb..2a73236 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/dataconnector/AbstractDataConnectorProvider.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/dataconnector/AbstractDataConnectorProvider.java @@ -8,13 +8,8 @@ import java.net.ConnectException; import java.util.List; public abstract class AbstractDataConnectorProvider implements IDataConnectorProvider { - public static final String MYSQL_TYPE = "mysql"; - public static final String POSTGRES_TYPE = "postgres"; - public static final String ORACLE_TYPE = "oracle"; - public static final String MSSQL_TYPE = "mssql"; - protected String scoped_db = null; - protected Object handle = null; + protected Object handle = null; protected boolean isOpen = false; protected DataConnector connector = null; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/dataconnector/DataConnectorProviderFactory.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/dataconnector/DataConnectorProviderFactory.java index babe89b..ce43633 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/dataconnector/DataConnectorProviderFactory.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/dataconnector/DataConnectorProviderFactory.java @@ -5,6 +5,13 @@ import org.apache.hadoop.hive.metastore.api.DataConnector; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.DatabaseType; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import static org.apache.hadoop.hive.metastore.dataconnector.IDataConnectorProvider.DERBY_TYPE; +import static org.apache.hadoop.hive.metastore.dataconnector.IDataConnectorProvider.MSSQL_TYPE; +import static org.apache.hadoop.hive.metastore.dataconnector.IDataConnectorProvider.MYSQL_TYPE; +import static org.apache.hadoop.hive.metastore.dataconnector.IDataConnectorProvider.ORACLE_TYPE; +import static org.apache.hadoop.hive.metastore.dataconnector.IDataConnectorProvider.POSTGRES_TYPE; +import org.apache.hadoop.hive.metastore.dataconnector.jdbc.JDBCConnectorProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,33 +39,43 @@ public class DataConnectorProviderFactory { public static synchronized IDataConnectorProvider getDataConnectorProvider(Database db) throws MetaException { IDataConnectorProvider provider = null; + DataConnector connector = null; if (db.getType() == DatabaseType.NATIVE) { throw new MetaException("Database " + db.getName() + " is of type NATIVE, no connector available"); } - // DataConnector connector = hmsHandler.getMS().getDataConnector(db.getConnector_name()); - + String scopedDb = (db.getRemote_dbname() != null) ? db.getRemote_dbname() : db.getName(); if (cache.containsKey(db.getConnector_name().toLowerCase() != null)) { provider = cache.get(db.getConnector_name().toLowerCase()); - if (provider != null) { - provider.setScope(db.getName()); + provider.setScope(scopedDb); } - return provider; } - // String type = connector.getType(); - String type = "mysql"; + try { + connector = hmsHandler.get_dataconnector_core(db.getConnector_name()); + } catch (NoSuchObjectException notexists) { + throw new MetaException("Data connector " + db.getConnector_name() + " associated with database " + + db.getName() + " does not exist"); + } + String type = connector.getType(); switch (type) { - case "mysql": - + case DERBY_TYPE: + case MSSQL_TYPE: + case MYSQL_TYPE: + case ORACLE_TYPE: + case POSTGRES_TYPE: + try { + provider = new JDBCConnectorProvider(scopedDb, connector); + } catch (Exception e) { + throw new MetaException("Could not instantiate a provider for database " + db.getName()); + } ; default: ; } - - cache.put(db.getConnector_name().toLowerCase(), provider); + cache.put(connector.getName().toLowerCase(), provider); return provider; } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/dataconnector/IDataConnectorProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/dataconnector/IDataConnectorProvider.java index 2239939..73b3db9 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/dataconnector/IDataConnectorProvider.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/dataconnector/IDataConnectorProvider.java @@ -20,6 +20,12 @@ import java.util.Map; @InterfaceAudience.Public @InterfaceStability.Evolving public interface IDataConnectorProvider { + public static final String MYSQL_TYPE = "mysql"; + public static final String POSTGRES_TYPE = "postgres"; + public static final String ORACLE_TYPE = "oracle"; + public static final String MSSQL_TYPE = "mssql"; + public static final String DERBY_TYPE = "derby"; + DataConnector connector = null; /** diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/dataconnector/jdbc/JDBCConnectorProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/dataconnector/jdbc/JDBCConnectorProvider.java index 2e92cc4..7e9d408 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/dataconnector/jdbc/JDBCConnectorProvider.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/dataconnector/jdbc/JDBCConnectorProvider.java @@ -10,14 +10,25 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Date; +import org.apache.hadoop.hive.conf.Constants; +import org.apache.hadoop.hive.metastore.ColumnType; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.DataConnector; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.dataconnector.AbstractDataConnectorProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class JDBCConnectorProvider extends AbstractDataConnectorProvider { private static Logger LOG = LoggerFactory.getLogger(JDBCConnectorProvider.class); @@ -30,7 +41,7 @@ public class JDBCConnectorProvider extends AbstractDataConnectorProvider { public JDBCConnectorProvider(String dbName, DataConnector dataConn) { super(dbName, dataConn); - this.type = connector.getType(); + this.type = connector.getType().toUpperCase(); // TODO this.jdbcUrl = connector.getUrl(); this.username = connector.getParameters().get("dataconnector.username"); this.password = connector.getParameters().get("dataconnector.password"); @@ -52,7 +63,8 @@ public class JDBCConnectorProvider extends AbstractDataConnectorProvider { throw new ConnectException("Could not connect to remote datasource at " + jdbcUrl + ",cause:" + sqle.getMessage()); } break; - + case POSTGRES_TYPE: + default: throw new RuntimeException("Unsupported JDBC type"); } @@ -100,7 +112,7 @@ public class JDBCConnectorProvider extends AbstractDataConnectorProvider { @Override public List<String> getTableNames() throws MetaException { ResultSet rs = null; try { - rs = getConnection().getMetaData().getTables(null, scoped_db, "*", new String[] { "TABLE" }); + rs = getConnection().getMetaData().getTables(scoped_db, null, null, new String[] { "TABLE" }); if (rs != null) { List<String> tables = new ArrayList<String>(); while(rs.next()) { @@ -128,7 +140,137 @@ public class JDBCConnectorProvider extends AbstractDataConnectorProvider { * @param tableName */ @Override public Table getTable(String tableName) throws MetaException { - return null; + try { + Statement stmt = getConnection().createStatement(); + ResultSet rs = stmt.executeQuery( + "SELECT table_name, column_name, is_nullable, data_type, character_maximum_length FROM INFORMATION_SCHEMA.Columns where table_schema='" + + scoped_db + "' and table_name='" + tableName + "'"); + List<FieldSchema> cols = new ArrayList<>(); + // TODO throw exception is RS is empty + while (rs.next()) { + FieldSchema fs = new FieldSchema(); + fs.setName(rs.getString("COLUMN_NAME")); + fs.setType(getDataType(rs.getString("DATA_TYPE"), rs.getInt("CHARACTER_MAXIMUM_LENGTH"))); + fs.setComment("inferred column type"); + cols.add(fs); + } + + //Setting the storage descriptor. + StorageDescriptor sd = new StorageDescriptor(); + sd.setCols(cols); + // sd.se + SerDeInfo serdeInfo = new SerDeInfo(); + serdeInfo.setName(tableName); + serdeInfo.setSerializationLib("org.apache.hive.storage.jdbc.JdbcSerDe"); + Map<String, String> serdeParams = new HashMap<String, String>(); + serdeParams.put("serialization.format", "1"); + serdeInfo.setParameters(serdeParams); + + // StorageHandler + + // serdeInfo.setDeserializerClass(); + sd.setSerdeInfo(serdeInfo); + // sd.getSerdeInfo().setName(tableName); + sd.setInputFormat("org.apache.hive.storage.jdbc.JdbcInputFormat"); // TODO + sd.setOutputFormat("org.apache.hive.storage.jdbc.JdbcOutputFormat"); // TODO + sd.setLocation("/tmp/some_dummy_path"); // TODO + sd.setBucketCols(new ArrayList<String>()); + sd.setSortCols(new ArrayList<Order>()); + + //Setting the table properties. + Map<String, String> tblProps = new HashMap<>(); + tblProps.put(Constants.JDBC_DATABASE_TYPE, this.type); + tblProps.put(Constants.JDBC_DRIVER, this.driverClassName); + tblProps.put(Constants.JDBC_URL, this.jdbcUrl); // "jdbc://localhost:3306/hive" + tblProps.put(Constants.JDBC_USERNAME, this.username); + tblProps.put(Constants.JDBC_PASSWORD, this.password); + tblProps.put(Constants.JDBC_TABLE, tableName); + tblProps.put(hive_metastoreConstants.META_TABLE_STORAGE, Constants.JDBC_HIVE_STORAGE_HANDLER_ID); + tblProps.put("EXTERNAL", "TRUE"); + // TODO: Need to include schema, catalog info in the paramters list. + + //Setting the required table information + Table table = new Table(); + table.setTableName(tableName); + table.setTableType(TableType.EXTERNAL_TABLE.toString()); + table.setDbName(scoped_db); + table.setSd(sd); + table.setParameters(tblProps); + // set partition keys to empty + table.setPartitionKeys(new ArrayList<FieldSchema>()); + + return table; + } catch (Exception e) { + LOG.warn("Exception retrieving remote table " + scoped_db + "." + tableName + " via data connector " + + connector.getName()); + throw new MetaException("Error retrieving remote table:" + e); + } + } + + private String wrapSize(int size) { + return "(" + size + ")"; } + private String getDataType(String mySqlType, int size) { + //TODO: Geomentric, network, bit, array data types of postgresql needs to be supported. + switch(mySqlType) + { + case "char": + return ColumnType.CHAR_TYPE_NAME + wrapSize(size); + case "varchar": + case "tinytext": + return ColumnType.VARCHAR_TYPE_NAME + wrapSize(size); + case "text": + case "mediumtext": + case "enum": + case "set": + case "tsvector": + case "tsquery": + case "uuid": + case "json": + return ColumnType.STRING_TYPE_NAME; + case "blob": + case "mediumblob": + case "longblob": + case "bytea": + return ColumnType.BINARY_TYPE_NAME; + case "tinyint": + return ColumnType.TINYINT_TYPE_NAME; + case "smallint": + case "smallserial": + return ColumnType.SMALLINT_TYPE_NAME; + case "mediumint": + case "int": + case "serial": + return ColumnType.INT_TYPE_NAME; + case "bigint": + case "bigserial": + case "money": + return ColumnType.BIGINT_TYPE_NAME; + case "float": + case "real": + return ColumnType.FLOAT_TYPE_NAME; + case "double": + case "double precision": + return ColumnType.DOUBLE_TYPE_NAME; + case "decimal": + case "numeric": + return ColumnType.DECIMAL_TYPE_NAME; + case "date": + return ColumnType.DATE_TYPE_NAME; + case "datetime": + return ColumnType.DATETIME_TYPE_NAME; + case "timestamp": + case "time": + case "interval": + return ColumnType.TIMESTAMP_TYPE_NAME; + case "timestampz": + case "timez": + return ColumnType.TIMESTAMPTZ_TYPE_NAME; + case "boolean": + return ColumnType.BOOLEAN_TYPE_NAME; + default: + return ColumnType.VOID_TYPE_NAME; + } + } }