This is an automated email from the ASF dual-hosted git repository. vihangk1 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 5af8a61 HIVE-24120 Plugin for external DatabaseProduct in standalone HMS (#1470) 5af8a61 is described below commit 5af8a612f844482a02fd8e59aec25c854ba8a175 Author: gatorblue <70545723+gatorb...@users.noreply.github.com> AuthorDate: Mon Oct 19 16:04:07 2020 -0400 HIVE-24120 Plugin for external DatabaseProduct in standalone HMS (#1470) --- .gitignore | 2 + .../hcatalog/listener/DbNotificationListener.java | 11 +- .../hadoop/hive/metastore/conf/MetastoreConf.java | 11 + .../hadoop/hive/metastore/DatabaseProduct.java | 644 ++++++++++++++++++++- .../hadoop/hive/metastore/MetaStoreDirectSql.java | 40 +- .../apache/hadoop/hive/metastore/ObjectStore.java | 29 +- .../datasource/DbCPDataSourceProvider.java | 25 +- .../datasource/HikariCPDataSourceProvider.java | 25 +- .../hadoop/hive/metastore/tools/SQLGenerator.java | 144 +---- .../hadoop/hive/metastore/txn/TxnDbUtil.java | 154 ++--- .../hadoop/hive/metastore/txn/TxnHandler.java | 102 +--- .../hadoop/hive/metastore/DummyCustomRDBMS.java | 122 ++++ .../hadoop/hive/metastore/txn/TestTxnUtils.java | 30 +- 13 files changed, 883 insertions(+), 456 deletions(-) diff --git a/.gitignore b/.gitignore index 9dc3dc4..83859c9 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ build-eclipse .settings .factorypath *.launch +*.metadata *~ metastore_db common/src/gen @@ -39,3 +40,4 @@ launch.json settings.json kafka-handler/src/test/gen **/.vscode/ +/.recommenders/ diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 6041af7..d7757e6 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -35,7 +35,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hive.metastore.DatabaseProduct; import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; import org.apache.hadoop.hive.metastore.MetaStoreEventListenerConstants; import org.apache.hadoop.hive.metastore.RawStore; @@ -132,7 +131,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; -import static org.apache.hadoop.hive.metastore.DatabaseProduct.MYSQL; /** * An implementation of {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} that @@ -959,8 +957,9 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener try { stmt = dbConn.createStatement(); - if (sqlGenerator.getDbProduct() == MYSQL) { - stmt.execute("SET @@session.sql_mode=ANSI_QUOTES"); + String st = sqlGenerator.getDbProduct().getPrepareTxnStmt(); + if (st != null) { + stmt.execute(st); } String s = sqlGenerator.addForUpdateClause("select \"WNL_FILES\", \"WNL_ID\" from" + @@ -1054,14 +1053,14 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener stmt = dbConn.createStatement(); event.setMessageFormat(msgEncoder.getMessageFormat()); - if (sqlGenerator.getDbProduct() == MYSQL) { + if (sqlGenerator.getDbProduct().isMYSQL()) { stmt.execute("SET @@session.sql_mode=ANSI_QUOTES"); } // Derby doesn't allow FOR UPDATE to lock the row being selected (See https://db.apache // .org/derby/docs/10.1/ref/rrefsqlj31783.html) . So lock the whole table. Since there's // only one row in the table, this shouldn't cause any performance degradation. - if (sqlGenerator.getDbProduct() == DatabaseProduct.DERBY) { + if (sqlGenerator.getDbProduct().isDERBY()) { String lockingQuery = "lock table \"NOTIFICATION_SEQUENCE\" in exclusive mode"; LOG.info("Going to execute query <" + lockingQuery + ">"); stmt.executeUpdate(lockingQuery); diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index c179ace..cdbe919 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -1356,6 +1356,17 @@ public class MetastoreConf { HIVE_TXN_STATS_ENABLED("hive.txn.stats.enabled", "hive.txn.stats.enabled", true, "Whether Hive supports transactional stats (accurate stats for transactional tables)"), + // External RDBMS support + USE_CUSTOM_RDBMS("metastore.use.custom.database.product", + "hive.metastore.use.custom.database.product", false, + "Use an external RDBMS which is not in the list of natively supported databases (Derby,\n" + + "Mysql, Oracle, Postgres, MSSQL), as defined by hive.metastore.db.type. If this configuration\n" + + "is true, the metastore.custom.database.product.classname must be set to a valid class name"), + CUSTOM_RDBMS_CLASSNAME("metastore.custom.database.product.classname", + "hive.metastore.custom.database.product.classname", "none", + "Hook for external RDBMS. This class will be instantiated only when " + + "metastore.use.custom.database.product is set to true."), + // Deprecated Hive values that we are keeping for backwards compatibility. @Deprecated HIVE_CODAHALE_METRICS_REPORTER_CLASSES("hive.service.metrics.codahale.reporter.classes", diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java index b798cdd..8cde90d 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java @@ -20,60 +20,187 @@ package org.apache.hadoop.hive.metastore; import java.sql.SQLException; import java.sql.SQLTransactionRollbackException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.List; +import java.util.Map; -/** Database product infered via JDBC. */ -public enum DatabaseProduct { - DERBY, MYSQL, POSTGRES, ORACLE, SQLSERVER, OTHER; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.util.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** Database product inferred via JDBC. Encapsulates all SQL logic associated with + * the database product. + * This class is a singleton, which is instantiated the first time + * method determineDatabaseProduct is invoked. + * Tests that need to create multiple instances can use the reset method + * */ +public class DatabaseProduct implements Configurable { + static final private Logger LOG = LoggerFactory.getLogger(DatabaseProduct.class.getName()); + + private static enum DbType {DERBY, MYSQL, POSTGRES, ORACLE, SQLSERVER, CUSTOM, UNDEFINED}; + public DbType dbType; + + // Singleton instance + private static DatabaseProduct theDatabaseProduct; + + Configuration myConf; + /** + * Protected constructor for singleton class + */ + protected DatabaseProduct() {} + + public static final String DERBY_NAME = "derby"; + public static final String SQL_SERVER_NAME = "microsoft sql server"; + public static final String MYSQL_NAME = "mysql"; + public static final String POSTGRESQL_NAME = "postgresql"; + public static final String ORACLE_NAME = "oracle"; + public static final String UNDEFINED_NAME = "other"; + /** * Determine the database product type * @param productName string to defer database connection * @return database product type */ - public static DatabaseProduct determineDatabaseProduct(String productName) throws SQLException { - if (productName == null) { - return OTHER; + public static DatabaseProduct determineDatabaseProduct(String productName, + Configuration conf) { + DbType dbt; + + if (theDatabaseProduct != null) { + Preconditions.checkState(theDatabaseProduct.dbType == getDbType(productName)); + return theDatabaseProduct; } + + // This method may be invoked by concurrent connections + synchronized (DatabaseProduct.class) { + + if (productName == null) { + productName = UNDEFINED_NAME; + } + + dbt = getDbType(productName); + + // Check for null again in case of race condition + if (theDatabaseProduct == null) { + Preconditions.checkNotNull(conf, "Configuration is null"); + // Check if we are using an external database product + boolean isExternal = MetastoreConf.getBoolVar(conf, ConfVars.USE_CUSTOM_RDBMS); + + if (isExternal) { + // The DatabaseProduct will be created by instantiating an external class via + // reflection. The external class can override any method in the current class + String className = MetastoreConf.getVar(conf, ConfVars.CUSTOM_RDBMS_CLASSNAME); + + if (className != null) { + try { + theDatabaseProduct = (DatabaseProduct) + ReflectionUtils.newInstance(Class.forName(className), conf); + + LOG.info(String.format("Using custom RDBMS %s", className)); + dbt = DbType.CUSTOM; + } catch (Exception e) { + throw new RuntimeException( + "Caught exception instantiating custom database product", e); + } + } else { + throw new RuntimeException( + "Unexpected: metastore.use.custom.database.product was set, " + + "but metastore.custom.database.product.classname was not"); + } + } + + if (theDatabaseProduct == null) { + theDatabaseProduct = new DatabaseProduct(); + } + + theDatabaseProduct.dbType = dbt; + } + } + return theDatabaseProduct; + } + + private static DbType getDbType(String productName) { + DbType dbt; productName = productName.toLowerCase(); - if (productName.contains("derby")) { - return DERBY; - } else if (productName.contains("microsoft sql server")) { - return SQLSERVER; - } else if (productName.contains("mysql")) { - return MYSQL; - } else if (productName.contains("oracle")) { - return ORACLE; - } else if (productName.contains("postgresql")) { - return POSTGRES; + + if (productName.contains(DERBY_NAME)) { + dbt = DbType.DERBY; + } else if (productName.contains(SQL_SERVER_NAME)) { + dbt = DbType.SQLSERVER; + } else if (productName.contains(MYSQL_NAME)) { + dbt = DbType.MYSQL; + } else if (productName.contains(ORACLE_NAME)) { + dbt = DbType.ORACLE; + } else if (productName.contains(POSTGRESQL_NAME)) { + dbt = DbType.POSTGRES; } else { - return OTHER; + dbt = DbType.UNDEFINED; } + return dbt; + } + + public final boolean isDERBY() { + return dbType == DbType.DERBY; + } + + public final boolean isMYSQL() { + return dbType == DbType.MYSQL; } - public static boolean isDeadlock(DatabaseProduct dbProduct, SQLException e) { + public final boolean isORACLE() { + return dbType == DbType.ORACLE; + } + + public final boolean isSQLSERVER() { + return dbType == DbType.SQLSERVER; + } + + public final boolean isPOSTGRES() { + return dbType == DbType.POSTGRES; + } + + public final boolean isCUSTOM() { + return dbType == DbType.CUSTOM; + } + + public final boolean isUNDEFINED() { + return dbType == DbType.UNDEFINED; + } + + public boolean isDeadlock(SQLException e) { return e instanceof SQLTransactionRollbackException - || ((dbProduct == MYSQL || dbProduct == POSTGRES || dbProduct == SQLSERVER) + || ((isMYSQL() || isPOSTGRES() || isSQLSERVER() || isCUSTOM()) && "40001".equals(e.getSQLState())) - || (dbProduct == POSTGRES && "40P01".equals(e.getSQLState())) - || (dbProduct == ORACLE && (e.getMessage() != null && (e.getMessage().contains("deadlock detected") + || (isPOSTGRES() && "40P01".equals(e.getSQLState())) + || (isORACLE() && (e.getMessage() != null && (e.getMessage().contains("deadlock detected") || e.getMessage().contains("can't serialize access for this transaction")))); } /** * Whether the RDBMS has restrictions on IN list size (explicit, or poor perf-based). */ - public static boolean needsInBatching(DatabaseProduct dbType) { - return dbType == ORACLE || dbType == SQLSERVER; + protected boolean needsInBatching() { + return isORACLE() || isSQLSERVER(); } /** * Whether the RDBMS has a bug in join and filter operation order described in DERBY-6358. */ - public static boolean hasJoinOperationOrderBug(DatabaseProduct dbType) { - return dbType == DERBY || dbType == ORACLE || dbType == POSTGRES; + protected boolean hasJoinOperationOrderBug() { + return isDERBY() || isORACLE() || isPOSTGRES(); } - public static String getHiveSchemaPostfix(DatabaseProduct dbType) { + public String getHiveSchemaPostfix() { switch (dbType) { case SQLSERVER: return "mssql"; @@ -81,10 +208,473 @@ public enum DatabaseProduct { case MYSQL: case POSTGRES: case ORACLE: + case CUSTOM: return dbType.name().toLowerCase(); - case OTHER: + case UNDEFINED: default: return null; } } + + @VisibleForTesting + public static void reset() { + theDatabaseProduct = null; + } + + protected String toDate(String tableValue) { + if (isORACLE()) { + return "TO_DATE(" + tableValue + ", 'YYYY-MM-DD')"; + } else { + return "cast(" + tableValue + " as date)"; + } + } + + /** + * Returns db-specific logic to be executed at the beginning of a transaction. + * Used in pooled connections. + */ + public String getPrepareTxnStmt() { + if (isMYSQL()) { + return "SET @@session.sql_mode=ANSI_QUOTES"; + } + else { + return null; + } + } + + private static final EnumMap<DatabaseProduct.DbType, String> DB_EPOCH_FN = + new EnumMap<DatabaseProduct.DbType, String>(DatabaseProduct.DbType.class) {{ + put(DbType.DERBY, "{ fn timestampdiff(sql_tsi_frac_second, timestamp('" + new Timestamp(0) + + "'), current_timestamp) } / 1000000"); + put(DbType.CUSTOM, "{ fn timestampdiff(sql_tsi_frac_second, timestamp('" + new Timestamp(0) + + "'), current_timestamp) } / 1000000"); + put(DbType.MYSQL, "round(unix_timestamp(now(3)) * 1000)"); + put(DbType.POSTGRES, "round(extract(epoch from current_timestamp) * 1000)"); + put(DbType.ORACLE, "(cast(systimestamp at time zone 'UTC' as date) - date '1970-01-01')*24*60*60*1000 " + + "+ cast(mod( extract( second from systimestamp ), 1 ) * 1000 as int)"); + put(DbType.SQLSERVER, "datediff_big(millisecond, '19700101', sysutcdatetime())"); + }}; + + private static final EnumMap<DatabaseProduct.DbType, String> DB_SEED_FN = + new EnumMap<DatabaseProduct.DbType, String>(DatabaseProduct.DbType.class) {{ + put(DbType.DERBY, "ALTER TABLE \"TXNS\" ALTER \"TXN_ID\" RESTART WITH %s"); + put(DbType.CUSTOM, "ALTER TABLE \"TXNS\" ALTER \"TXN_ID\" RESTART WITH %s"); + put(DbType.MYSQL, "ALTER TABLE \"TXNS\" AUTO_INCREMENT = %s"); + put(DbType.POSTGRES, "ALTER SEQUENCE \"TXNS_TXN_ID_seq\" RESTART WITH %s"); + put(DbType.ORACLE, "ALTER TABLE \"TXNS\" MODIFY \"TXN_ID\" GENERATED BY DEFAULT AS IDENTITY (START WITH %s )"); + put(DbType.SQLSERVER, "DBCC CHECKIDENT ('txns', RESEED, %s )"); + }}; + + public String getTxnSeedFn(long seedTxnId) { + return String.format(DB_SEED_FN.get(dbType), seedTxnId); + } + + /** + * Get database specific function which returns the milliseconds value after the epoch. + * @throws MetaException For unknown database type. + */ + public String getMillisAfterEpochFn() throws MetaException { + String epochFn = DB_EPOCH_FN.get(dbType); + if (epochFn != null) { + return epochFn; + } else { + String msg = "Unknown database product: " + dbType.toString(); + LOG.error(msg); + throw new MetaException(msg); + } + } + + /** + * Returns the query to fetch the current timestamp in milliseconds in the database + * @throws MetaException when the dbType is unknown. + */ + public String getDBTime() throws MetaException { + String s; + switch (dbType) { + case DERBY: + case CUSTOM: // ANSI SQL + s = "values current_timestamp"; + break; + + case MYSQL: + case POSTGRES: + case SQLSERVER: + s = "select current_timestamp"; + break; + + case ORACLE: + s = "select current_timestamp from dual"; + break; + + default: + String msg = "Unknown database product: " + dbType.toString(); + LOG.error(msg); + throw new MetaException(msg); + } + return s; + } + + /** + * Given an expr (such as a column name) returns a database specific SQL function string + * which when executed returns if the value of the expr is within the given interval of + * the current timestamp in the database + * @param expr + * @param intervalInSeconds + * @return + * @throws MetaException + */ + public String isWithinCheckInterval(String expr, long intervalInSeconds) throws MetaException { + String condition; + switch (dbType) { + case DERBY: + case CUSTOM: + condition = " {fn TIMESTAMPDIFF(sql_tsi_second, " + expr + ", current_timestamp)} <= " + intervalInSeconds; + break; + case MYSQL: + case POSTGRES: + condition = expr + " >= current_timestamp - interval '" + intervalInSeconds + "' second"; + break; + case SQLSERVER: + condition = "DATEDIFF(second, " + expr + ", current_timestamp) <= " + intervalInSeconds; + break; + case ORACLE: + condition = expr + " >= current_timestamp - numtodsinterval(" + intervalInSeconds + " , 'second')"; + break; + default: + String msg = "Unknown database product: " + dbType.toString(); + LOG.error(msg); + throw new MetaException(msg); + } + return condition; + } + + public String addForUpdateClause(String selectStatement) throws MetaException { + switch (dbType) { + case DERBY: + //https://db.apache.org/derby/docs/10.1/ref/rrefsqlj31783.html + //sadly in Derby, FOR UPDATE doesn't meant what it should + return selectStatement; + case MYSQL: + //http://dev.mysql.com/doc/refman/5.7/en/select.html + case ORACLE: + //https://docs.oracle.com/cd/E17952_01/refman-5.6-en/select.html + case POSTGRES: + //http://www.postgresql.org/docs/9.0/static/sql-select.html + case CUSTOM: // ANSI SQL + return selectStatement + " for update"; + case SQLSERVER: + //https://msdn.microsoft.com/en-us/library/ms189499.aspx + //https://msdn.microsoft.com/en-us/library/ms187373.aspx + String modifier = " with (updlock)"; + int wherePos = selectStatement.toUpperCase().indexOf(" WHERE "); + if (wherePos < 0) { + return selectStatement + modifier; + } + return selectStatement.substring(0, wherePos) + modifier + + selectStatement.substring(wherePos, selectStatement.length()); + default: + String msg = "Unrecognized database product name <" + dbType + ">"; + LOG.error(msg); + throw new MetaException(msg); + } + } + + /** + * Add a limit clause to a given query + * @param numRows + * @param noSelectsqlQuery + * @return + * @throws MetaException + */ + public String addLimitClause(int numRows, String noSelectsqlQuery) throws MetaException { + switch (dbType) { + case DERBY: + case CUSTOM: // ANSI SQL + //http://db.apache.org/derby/docs/10.7/ref/rrefsqljoffsetfetch.html + return "select " + noSelectsqlQuery + " fetch first " + numRows + " rows only"; + case MYSQL: + //http://www.postgresql.org/docs/7.3/static/queries-limit.html + case POSTGRES: + //https://dev.mysql.com/doc/refman/5.0/en/select.html + return "select " + noSelectsqlQuery + " limit " + numRows; + case ORACLE: + //newer versions (12c and later) support OFFSET/FETCH + return "select * from (select " + noSelectsqlQuery + ") where rownum <= " + numRows; + case SQLSERVER: + //newer versions (2012 and later) support OFFSET/FETCH + //https://msdn.microsoft.com/en-us/library/ms189463.aspx + return "select TOP(" + numRows + ") " + noSelectsqlQuery; + default: + String msg = "Unrecognized database product name <" + dbType + ">"; + LOG.error(msg); + throw new MetaException(msg); + } + } + + /** + * Returns the SQL query to lock the given table name in either shared/exclusive mode + * @param txnLockTable + * @param shared + * @return + * @throws MetaException + */ + public String lockTable(String txnLockTable, boolean shared) throws MetaException { + switch (dbType) { + case MYSQL: + // For Mysql we do not use lock table statement for two reasons + // It is not released automatically on commit/rollback + // It requires to lock every table that will be used by the statement + // https://dev.mysql.com/doc/refman/8.0/en/lock-tables.html + return "SELECT \"TXN_LOCK\" FROM \"" + txnLockTable + "\" " + (shared ? "LOCK IN SHARE MODE" : "FOR UPDATE"); + case POSTGRES: + // https://www.postgresql.org/docs/9.4/sql-lock.html + case DERBY: + // https://db.apache.org/derby/docs/10.4/ref/rrefsqlj40506.html + case ORACLE: + // https://docs.oracle.com/cd/B19306_01/server.102/b14200/statements_9015.htm + case CUSTOM: // ANSI SQL + return "LOCK TABLE \"" + txnLockTable + "\" IN " + (shared ? "SHARE" : "EXCLUSIVE") + " MODE"; + case SQLSERVER: + // https://docs.microsoft.com/en-us/sql/t-sql/queries/hints-transact-sql-table?view=sql-server-ver15 + return "SELECT * FROM \"" + txnLockTable + "\" WITH (" + (shared ? "TABLOCK" : "TABLOCKX") + ", HOLDLOCK)"; + default: + String msg = "Unrecognized database product name <" + dbType + ">"; + LOG.error(msg); + throw new MetaException(msg); + } + } + + public List<String> getResetTxnSequenceStmts() { + List<String> stmts = new ArrayList<>(); + switch (dbType) { + + case DERBY: + case CUSTOM: //ANSI SQL + stmts.add("ALTER TABLE \"TXNS\" ALTER \"TXN_ID\" RESTART WITH 1"); + stmts.add("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\"," + + " \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\")" + + " VALUES(0, 'c', 0, 0, '', '')"); + break; + case MYSQL: + stmts.add("ALTER TABLE \"TXNS\" AUTO_INCREMENT=1"); + stmts.add("SET SQL_MODE='NO_AUTO_VALUE_ON_ZERO,ANSI_QUOTES'"); + stmts.add("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\"," + + " \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\")" + + " VALUES(0, 'c', 0, 0, '', '')"); + break; + case POSTGRES: + stmts.add("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\"," + + " \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\")" + + " VALUES(0, 'c', 0, 0, '', '')"); + stmts.add("ALTER SEQUENCE \"TXNS_TXN_ID_seq\" RESTART"); + break; + case ORACLE: + stmts.add("ALTER TABLE \"TXNS\" MODIFY \"TXN_ID\" GENERATED BY DEFAULT AS IDENTITY (START WITH 1)"); + stmts.add("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\"," + + " \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\")" + + " VALUES(0, 'c', 0, 0, '_', '_')"); + break; + case SQLSERVER: + stmts.add("DBCC CHECKIDENT ('txns', RESEED, 0)"); + stmts.add("SET IDENTITY_INSERT TXNS ON"); + stmts.add("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\"," + + " \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\")" + + " VALUES(0, 'c', 0, 0, '', '')"); + break; + case UNDEFINED: + default: + break; + } + return stmts; + } + + public String getTruncateStatement(String name) { + if (isPOSTGRES() || isMYSQL()) { + return ("DELETE FROM \"" + name + "\""); + } else { + return("DELETE FROM " + name); + } + } + + /** + * Checks if the dbms supports the getGeneratedKeys for multiline insert statements. + * @return true if supports + * @throws MetaException + */ + public boolean supportsGetGeneratedKeys() throws MetaException { + switch (dbType) { + case DERBY: + case CUSTOM: + case SQLSERVER: + // The getGeneratedKeys is not supported for multi line insert + return false; + case ORACLE: + case MYSQL: + case POSTGRES: + return true; + case UNDEFINED: + default: + String msg = "Unknown database product: " + dbType.toString(); + LOG.error(msg); + throw new MetaException(msg); + } + } + + public boolean isDuplicateKeyError(SQLException ex) { + switch (dbType) { + case DERBY: + case CUSTOM: // ANSI SQL + if("23505".equals(ex.getSQLState())) { + return true; + } + break; + case MYSQL: + //https://dev.mysql.com/doc/refman/5.5/en/error-messages-server.html + if((ex.getErrorCode() == 1022 || ex.getErrorCode() == 1062 || ex.getErrorCode() == 1586) + && "23000".equals(ex.getSQLState())) { + return true; + } + break; + case SQLSERVER: + //2627 is unique constaint violation incl PK, 2601 - unique key + if ((ex.getErrorCode() == 2627 || ex.getErrorCode() == 2601) && "23000".equals(ex.getSQLState())) { + return true; + } + break; + case ORACLE: + if(ex.getErrorCode() == 1 && "23000".equals(ex.getSQLState())) { + return true; + } + break; + case POSTGRES: + //http://www.postgresql.org/docs/8.1/static/errcodes-appendix.html + if("23505".equals(ex.getSQLState())) { + return true; + } + break; + default: + String msg = ex.getMessage() + + " (SQLState=" + ex.getSQLState() + ", ErrorCode=" + ex.getErrorCode() + ")"; + throw new IllegalArgumentException("Unexpected DB type: " + dbType + "; " + msg); + } + return false; + + } + + /** + * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB. + * + * @param tblColumns e.g. "T(a,b,c)" + * @param rows e.g. list of Strings like 3,4,'d' + * @param rowsCountInStmts Output the number of rows in each insert statement returned. + * @param conf + * @return fully formed INSERT INTO ... statements + */ + public List<String> createInsertValuesStmt(String tblColumns, List<String> rows, + List<Integer> rowsCountInStmts, Configuration conf) { + List<String> insertStmts = new ArrayList<>(); + StringBuilder sb = new StringBuilder(); + int numRowsInCurrentStmt = 0; + switch (dbType) { + case ORACLE: + if (rows.size() > 1) { + //http://www.oratable.com/oracle-insert-all/ + //https://livesql.oracle.com/apex/livesql/file/content_BM1LJQ87M5CNIOKPOWPV6ZGR3.html + for (int numRows = 0; numRows < rows.size(); numRows++) { + if (numRows % MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE) == 0) { + if (numRows > 0) { + sb.append(" select * from dual"); + insertStmts.add(sb.toString()); + if (rowsCountInStmts != null) { + rowsCountInStmts.add(numRowsInCurrentStmt); + } + numRowsInCurrentStmt = 0; + } + sb.setLength(0); + sb.append("insert all "); + } + sb.append("into ").append(tblColumns).append(" values(").append(rows.get(numRows)) + .append(") "); + numRowsInCurrentStmt++; + } + sb.append("select * from dual"); + insertStmts.add(sb.toString()); + if (rowsCountInStmts != null) { + rowsCountInStmts.add(numRowsInCurrentStmt); + } + return insertStmts; + } + //fall through + case DERBY: + case MYSQL: + case POSTGRES: + case SQLSERVER: + case CUSTOM: + for (int numRows = 0; numRows < rows.size(); numRows++) { + if (numRows % MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE) == 0) { + if (numRows > 0) { + insertStmts.add(sb.substring(0, sb.length() - 1));//exclude trailing comma + if (rowsCountInStmts != null) { + rowsCountInStmts.add(numRowsInCurrentStmt); + } + numRowsInCurrentStmt = 0; + } + sb.setLength(0); + sb.append("insert into ").append(tblColumns).append(" values"); + } + sb.append('(').append(rows.get(numRows)).append("),"); + numRowsInCurrentStmt++; + } + insertStmts.add(sb.substring(0, sb.length() - 1));//exclude trailing comma + if (rowsCountInStmts != null) { + rowsCountInStmts.add(numRowsInCurrentStmt); + } + return insertStmts; + default: + String msg = "Unrecognized database product name <" + dbType + ">"; + LOG.error(msg); + throw new IllegalStateException(msg); + } + } + + public String addEscapeCharacters(String s) { + if (isMYSQL()) { + return s.replaceAll("\\\\", "\\\\\\\\"); + } + return s; + } + + /** + * Get datasource properties for connection pools + * + */ + public Map<String, String> getDataSourceProperties() { + Map<String, String> map = new HashMap<>(); + + switch (dbType){ + case MYSQL: + map.put("allowMultiQueries", "true"); + map.put("rewriteBatchedStatements", "true"); + break; + case POSTGRES: + map.put("reWriteBatchedInserts", "true"); + break; + default: + break; + } + return map; + } + + // This class implements the Configurable interface for the benefit + // of "plugin" instances created via reflection (see invocation of + // ReflectionUtils.newInstance in method determineDatabaseProduct) + @Override + public Configuration getConf() { + return myConf; + } + + @Override + public void setConf(Configuration c) { + myConf = c; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index b0ea48f..bc4a034 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -170,16 +170,13 @@ class MetaStoreDirectSql { this.conf = conf; this.schema = schema; DatabaseProduct dbType = null; - try { - dbType = DatabaseProduct.determineDatabaseProduct(getProductName(pm)); - } catch (SQLException e) { - LOG.warn("Cannot determine database product; assuming OTHER", e); - dbType = DatabaseProduct.OTHER; - } + + dbType = DatabaseProduct.determineDatabaseProduct(getProductName(pm), conf); + this.dbType = dbType; int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_PARTITION_BATCH_SIZE); if (batchSize == DETECT_BATCHING) { - batchSize = DatabaseProduct.needsInBatching(dbType) ? 1000 : NO_BATCHING; + batchSize = dbType.needsInBatching() ? 1000 : NO_BATCHING; } this.batchSize = batchSize; ImmutableMap.Builder<String, String> fieldNameToTableNameBuilder = @@ -528,7 +525,7 @@ class MetaStoreDirectSql { StringBuilder orderColumns = new StringBuilder(), orderClause = new StringBuilder(); int i = 0; List<Object> paramsForOrder = new ArrayList<Object>(); - boolean dbHasJoinCastBug = DatabaseProduct.hasJoinOperationOrderBug(dbType); + boolean dbHasJoinCastBug = dbType.hasJoinOperationOrderBug(); for (Object[] orderSpec: orderSpecs) { int partColIndex = (int)orderSpec[0]; String orderAlias = "ODR" + (i++); @@ -549,11 +546,7 @@ class MetaStoreDirectSql { PartitionFilterGenerator.FilterType type = PartitionFilterGenerator.FilterType.fromType(colType); if (type == PartitionFilterGenerator.FilterType.Date) { - if (dbType == DatabaseProduct.ORACLE) { - tableValue = "TO_DATE(" + tableValue + ", 'YYYY-MM-DD')"; - } else { - tableValue = "cast(" + tableValue + " as date)"; - } + tableValue = dbType.toDate(tableValue); } else if (type == PartitionFilterGenerator.FilterType.Integral) { tableValue = "CAST(" + tableColumn + " AS decimal(21,0))"; } @@ -799,7 +792,7 @@ class MetaStoreDirectSql { SqlFilterForPushdown result) throws MetaException { // Derby and Oracle do not interpret filters ANSI-properly in some cases and need a workaround. assert partitionKeys != null; - boolean dbHasJoinCastBug = DatabaseProduct.hasJoinOperationOrderBug(dbType); + boolean dbHasJoinCastBug = dbType.hasJoinOperationOrderBug(); result.tableName = tableName; result.dbName = dbName; result.catName = catName; @@ -1358,12 +1351,7 @@ class MetaStoreDirectSql { if (colType == FilterType.Integral) { tableValue = "cast(" + tableValue + " as decimal(21,0))"; } else if (colType == FilterType.Date) { - if (dbType == DatabaseProduct.ORACLE) { - // Oracle requires special treatment... as usual. - tableValue = "TO_DATE(" + tableValue + ", 'YYYY-MM-DD')"; - } else { - tableValue = "cast(" + tableValue + " as date)"; - } + tableValue = dbType.toDate(tableValue); } // Workaround for HIVE_DEFAULT_PARTITION - ignore it like JDO does, for now. @@ -1384,12 +1372,7 @@ class MetaStoreDirectSql { tableValue += " then " + tableValue0 + " else null end)"; if (valType == FilterType.Date) { - if (dbType == DatabaseProduct.ORACLE) { - // Oracle requires special treatment... as usual. - nodeValue0 = "TO_DATE(" + nodeValue0 + ", 'YYYY-MM-DD')"; - } else { - nodeValue0 = "cast(" + nodeValue0 + " as date)"; - } + tableValue = dbType.toDate(tableValue); } } if (!node.isReverseOrder) { @@ -2179,12 +2162,13 @@ class MetaStoreDirectSql { * effect will apply to the connection that is executing the queries otherwise. */ public void prepareTxn() throws MetaException { - if (dbType != DatabaseProduct.MYSQL) { + String stmt = dbType.getPrepareTxnStmt(); + if (stmt == null) { return; } try { assert pm.currentTransaction().isActive(); // must be inside tx together with queries - executeNoResult("SET @@session.sql_mode=ANSI_QUOTES"); + executeNoResult(stmt); } catch (SQLException sqlEx) { throw new MetaException("Error setting ansi quotes: " + sqlEx.getMessage()); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 71d2de4..2fd776c 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -374,13 +374,10 @@ public class ObjectStore implements RawStore, Configurable { pm = PersistenceManagerProvider.getPersistenceManager(); LOG.info("RawStore: {}, with PersistenceManager: {}" + " created in the thread with id: {}", this, pm, Thread.currentThread().getId()); - try { - String productName = MetaStoreDirectSql.getProductName(pm); - sqlGenerator = new SQLGenerator(DatabaseProduct.determineDatabaseProduct(productName), conf); - } catch (SQLException e) { - LOG.error("error trying to figure out the database product", e); - throw new RuntimeException(e); - } + + String productName = MetaStoreDirectSql.getProductName(pm); + sqlGenerator = new SQLGenerator(DatabaseProduct.determineDatabaseProduct(productName, conf), conf); + isInitialized = pm != null; if (isInitialized) { dbType = determineDatabaseProduct(); @@ -394,12 +391,7 @@ public class ObjectStore implements RawStore, Configurable { } private DatabaseProduct determineDatabaseProduct() { - try { - return DatabaseProduct.determineDatabaseProduct(getProductName(pm)); - } catch (SQLException e) { - LOG.warn("Cannot determine database product; assuming OTHER", e); - return DatabaseProduct.OTHER; - } + return DatabaseProduct.determineDatabaseProduct(getProductName(pm), conf); } private static String getProductName(PersistenceManager pm) { @@ -8779,7 +8771,7 @@ public class ObjectStore implements RawStore, Configurable { if (oldStats != null) { StatObjectConverter.setFieldsIntoOldStats(mStatsObj, oldStats); } else { - if (sqlGenerator.getDbProduct().equals(DatabaseProduct.POSTGRES) && mStatsObj.getBitVector() == null) { + if (sqlGenerator.getDbProduct().isPOSTGRES() && mStatsObj.getBitVector() == null) { // workaround for DN bug in persisting nulls in pg bytea column // instead set empty bit vector with header. mStatsObj.setBitVector(new byte[] { 'H', 'L' }); @@ -8818,7 +8810,7 @@ public class ObjectStore implements RawStore, Configurable { if (oldStats != null) { StatObjectConverter.setFieldsIntoOldStats(mStatsObj, oldStats); } else { - if (sqlGenerator.getDbProduct().equals(DatabaseProduct.POSTGRES) && mStatsObj.getBitVector() == null) { + if (sqlGenerator.getDbProduct().isPOSTGRES() && mStatsObj.getBitVector() == null) { // workaround for DN bug in persisting nulls in pg bytea column // instead set empty bit vector with header. mStatsObj.setBitVector(new byte[] { 'H', 'L' }); @@ -10509,11 +10501,12 @@ public class ObjectStore implements RawStore, Configurable { } private void prepareQuotes() throws SQLException { - if (dbType == DatabaseProduct.MYSQL) { + String s = dbType.getPrepareTxnStmt(); + if (s != null) { assert pm.currentTransaction().isActive(); JDOConnection jdoConn = pm.getDataStoreConnection(); try (Statement statement = ((Connection) jdoConn.getNativeConnection()).createStatement()) { - statement.execute("SET @@session.sql_mode=ANSI_QUOTES"); + statement.execute(s); } finally { jdoConn.close(); } @@ -10521,7 +10514,7 @@ public class ObjectStore implements RawStore, Configurable { } private void lockNotificationSequenceForUpdate() throws MetaException { - if (sqlGenerator.getDbProduct() == DatabaseProduct.DERBY && directSql != null) { + if (sqlGenerator.getDbProduct().isDERBY() && directSql != null) { // Derby doesn't allow FOR UPDATE to lock the row being selected (See https://db.apache // .org/derby/docs/10.1/ref/rrefsqlj31783.html) . So lock the whole table. Since there's // only one row in the table, this shouldn't cause any performance degradation. diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java index 1a5a1d2..4069a93 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java @@ -17,10 +17,8 @@ */ package org.apache.hadoop.hive.metastore.datasource; -import static org.apache.hadoop.hive.metastore.DatabaseProduct.MYSQL; -import static org.apache.hadoop.hive.metastore.DatabaseProduct.determineDatabaseProduct; - import java.sql.SQLException; +import java.util.Map; import javax.sql.DataSource; @@ -73,18 +71,12 @@ public class DbCPDataSourceProvider implements DataSourceProvider { dbcpDs.setDefaultReadOnly(false); dbcpDs.setDefaultAutoCommit(true); - DatabaseProduct dbProduct = determineDatabaseProduct(driverUrl); - switch (dbProduct){ - case MYSQL: - dbcpDs.setConnectionProperties("allowMultiQueries=true"); - dbcpDs.setConnectionProperties("rewriteBatchedStatements=true"); - break; - case POSTGRES: - dbcpDs.setConnectionProperties("reWriteBatchedInserts=true"); - break; - default: - break; + DatabaseProduct dbProduct = DatabaseProduct.determineDatabaseProduct(driverUrl, hdpConfig); + Map<String, String> props = dbProduct.getDataSourceProperties(); + for (Map.Entry<String, String> kv : props.entrySet()) { + dbcpDs.setConnectionProperties(kv.getKey() + "=" + kv.getValue()); } + int maxPoolSize = hdpConfig.getInt( MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS.getVarname(), ((Long) MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS.getDefaultVal()).intValue()); @@ -123,8 +115,9 @@ public class DbCPDataSourceProvider implements DataSourceProvider { objectPool.setSoftMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis); objectPool.setLifo(lifo); - if (dbProduct == MYSQL) { - poolableConnFactory.setValidationQuery("SET @@session.sql_mode=ANSI_QUOTES"); + String stmt = dbProduct.getPrepareTxnStmt(); + if (stmt != null) { + poolableConnFactory.setValidationQuery(stmt); } return new PoolingDataSource(objectPool); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java index 8a4a9c2..6ed5d4a 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java @@ -29,10 +29,9 @@ import org.slf4j.LoggerFactory; import javax.sql.DataSource; import java.sql.SQLException; +import java.util.Map; import java.util.Properties; -import static org.apache.hadoop.hive.metastore.DatabaseProduct.determineDatabaseProduct; - /** * DataSourceProvider for the HikariCP connection pool. */ @@ -72,17 +71,19 @@ public class HikariCPDataSourceProvider implements DataSourceProvider { //https://github.com/brettwooldridge/HikariCP config.setConnectionTimeout(connectionTimeout); - DatabaseProduct dbProduct = determineDatabaseProduct(driverUrl); - switch (dbProduct){ - case MYSQL: - config.setConnectionInitSql("SET @@session.sql_mode=ANSI_QUOTES"); - config.addDataSourceProperty("allowMultiQueries", true); - config.addDataSourceProperty("rewriteBatchedStatements", true); - break; - case POSTGRES: - config.addDataSourceProperty("reWriteBatchedInserts", true); - break; + DatabaseProduct dbProduct = DatabaseProduct.determineDatabaseProduct(driverUrl, hdpConfig); + + String s = dbProduct.getPrepareTxnStmt(); + if (s!= null) { + config.setConnectionInitSql(s); + } + + Map<String, String> props = dbProduct.getDataSourceProperties(); + + for ( Map.Entry<String, String> kv : props.entrySet()) { + config.addDataSourceProperty(kv.getKey(), kv.getValue()); } + return new HikariDataSource(initMetrics(config)); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java index 08ef5e9..e207d21 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java @@ -21,8 +21,6 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.DatabaseProduct; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,68 +119,7 @@ public final class SQLGenerator { if (rows == null || rows.size() == 0) { return Collections.emptyList(); } - List<String> insertStmts = new ArrayList<>(); - StringBuilder sb = new StringBuilder(); - int numRowsInCurrentStmt = 0; - switch (dbProduct) { - case ORACLE: - if (rows.size() > 1) { - //http://www.oratable.com/oracle-insert-all/ - //https://livesql.oracle.com/apex/livesql/file/content_BM1LJQ87M5CNIOKPOWPV6ZGR3.html - for (int numRows = 0; numRows < rows.size(); numRows++) { - if (numRows % MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE) == 0) { - if (numRows > 0) { - sb.append(" select * from dual"); - insertStmts.add(sb.toString()); - if (rowsCountInStmts != null) { - rowsCountInStmts.add(numRowsInCurrentStmt); - } - numRowsInCurrentStmt = 0; - } - sb.setLength(0); - sb.append("insert all "); - } - sb.append("into ").append(tblColumns).append(" values(").append(rows.get(numRows)) - .append(") "); - numRowsInCurrentStmt++; - } - sb.append("select * from dual"); - insertStmts.add(sb.toString()); - if (rowsCountInStmts != null) { - rowsCountInStmts.add(numRowsInCurrentStmt); - } - return insertStmts; - } - //fall through - case DERBY: - case MYSQL: - case POSTGRES: - case SQLSERVER: - for (int numRows = 0; numRows < rows.size(); numRows++) { - if (numRows % MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE) == 0) { - if (numRows > 0) { - insertStmts.add(sb.substring(0, sb.length() - 1));//exclude trailing comma - if (rowsCountInStmts != null) { - rowsCountInStmts.add(numRowsInCurrentStmt); - } - numRowsInCurrentStmt = 0; - } - sb.setLength(0); - sb.append("insert into ").append(tblColumns).append(" values"); - } - sb.append('(').append(rows.get(numRows)).append("),"); - numRowsInCurrentStmt++; - } - insertStmts.add(sb.substring(0, sb.length() - 1));//exclude trailing comma - if (rowsCountInStmts != null) { - rowsCountInStmts.add(numRowsInCurrentStmt); - } - return insertStmts; - default: - String msg = "Unrecognized database product name <" + dbProduct + ">"; - LOG.error(msg); - throw new IllegalStateException(msg); - } + return dbProduct.createInsertValuesStmt(tblColumns, rows, rowsCountInStmts, conf); } /** @@ -190,33 +127,7 @@ public final class SQLGenerator { * construct. If the DB doesn't support, return original select. */ public String addForUpdateClause(String selectStatement) throws MetaException { - switch (dbProduct) { - case DERBY: - //https://db.apache.org/derby/docs/10.1/ref/rrefsqlj31783.html - //sadly in Derby, FOR UPDATE doesn't meant what it should - return selectStatement; - case MYSQL: - //http://dev.mysql.com/doc/refman/5.7/en/select.html - case ORACLE: - //https://docs.oracle.com/cd/E17952_01/refman-5.6-en/select.html - case POSTGRES: - //http://www.postgresql.org/docs/9.0/static/sql-select.html - return selectStatement + " for update"; - case SQLSERVER: - //https://msdn.microsoft.com/en-us/library/ms189499.aspx - //https://msdn.microsoft.com/en-us/library/ms187373.aspx - String modifier = " with (updlock)"; - int wherePos = selectStatement.toUpperCase().indexOf(" WHERE "); - if (wherePos < 0) { - return selectStatement + modifier; - } - return selectStatement.substring(0, wherePos) + modifier + - selectStatement.substring(wherePos, selectStatement.length()); - default: - String msg = "Unrecognized database product name <" + dbProduct + ">"; - LOG.error(msg); - throw new MetaException(msg); - } + return dbProduct.addForUpdateClause(selectStatement); } /** @@ -229,27 +140,7 @@ public final class SQLGenerator { * all columns are unique for Oracle. */ public String addLimitClause(int numRows, String noSelectsqlQuery) throws MetaException { - switch (dbProduct) { - case DERBY: - //http://db.apache.org/derby/docs/10.7/ref/rrefsqljoffsetfetch.html - return "select " + noSelectsqlQuery + " fetch first " + numRows + " rows only"; - case MYSQL: - //http://www.postgresql.org/docs/7.3/static/queries-limit.html - case POSTGRES: - //https://dev.mysql.com/doc/refman/5.0/en/select.html - return "select " + noSelectsqlQuery + " limit " + numRows; - case ORACLE: - //newer versions (12c and later) support OFFSET/FETCH - return "select * from (select " + noSelectsqlQuery + ") where rownum <= " + numRows; - case SQLSERVER: - //newer versions (2012 and later) support OFFSET/FETCH - //https://msdn.microsoft.com/en-us/library/ms189463.aspx - return "select TOP(" + numRows + ") " + noSelectsqlQuery; - default: - String msg = "Unrecognized database product name <" + dbProduct + ">"; - LOG.error(msg); - throw new MetaException(msg); - } + return dbProduct.addLimitClause(numRows, noSelectsqlQuery); } /** @@ -286,13 +177,9 @@ public final class SQLGenerator { // This is required for SQL executed directly. If the SQL has double quotes then some dbs tend to // remove the escape characters and store the variable without double quote. public String addEscapeCharacters(String s) { - if (dbProduct == DatabaseProduct.MYSQL) { - return s.replaceAll("\\\\", "\\\\\\\\"); - } - return s; + return dbProduct.addEscapeCharacters(s); } - /** * Creates a lock statement for open/commit transaction based on the dbProduct in shared read / exclusive mode. * @param shared shared or exclusive lock @@ -301,27 +188,6 @@ public final class SQLGenerator { */ public String createTxnLockStatement(boolean shared) throws MetaException{ String txnLockTable = "TXN_LOCK_TBL"; - switch (dbProduct) { - case MYSQL: - // For Mysql we do not use lock table statement for two reasons - // It is not released automatically on commit/rollback - // It requires to lock every table that will be used by the statement - // https://dev.mysql.com/doc/refman/8.0/en/lock-tables.html - return "SELECT \"TXN_LOCK\" FROM \"" + txnLockTable + "\" " + (shared ? "LOCK IN SHARE MODE" : "FOR UPDATE"); - case POSTGRES: - // https://www.postgresql.org/docs/9.4/sql-lock.html - case DERBY: - // https://db.apache.org/derby/docs/10.4/ref/rrefsqlj40506.html - case ORACLE: - // https://docs.oracle.com/cd/B19306_01/server.102/b14200/statements_9015.htm - return "LOCK TABLE \"" + txnLockTable + "\" IN " + (shared ? "SHARE" : "EXCLUSIVE") + " MODE"; - case SQLSERVER: - // https://docs.microsoft.com/en-us/sql/t-sql/queries/hints-transact-sql-table?view=sql-server-ver15 - return "SELECT * FROM \"" + txnLockTable + "\" WITH (" + (shared ? "TABLOCK" : "TABLOCKX") + ", HOLDLOCK)"; - default: - String msg = "Unrecognized database product name <" + dbProduct + ">"; - LOG.error(msg); - throw new MetaException(msg); - } + return dbProduct.lockTable(txnLockTable, shared); } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index 8b5b4e0..aff9e12 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -28,10 +28,8 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.SQLTransactionRollbackException; import java.sql.Statement; -import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; -import java.util.EnumMap; import java.util.HashSet; import java.util.List; import java.util.Properties; @@ -63,26 +61,6 @@ public final class TxnDbUtil { private static final Logger LOG = LoggerFactory.getLogger(TxnDbUtil.class.getName()); private static final String TXN_MANAGER = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"; - private static final EnumMap<DatabaseProduct, String> DB_EPOCH_FN = - new EnumMap<DatabaseProduct, String>(DatabaseProduct.class) {{ - put(DERBY, "{ fn timestampdiff(sql_tsi_frac_second, timestamp('" + new Timestamp(0) + - "'), current_timestamp) } / 1000000"); - put(MYSQL, "round(unix_timestamp(now(3)) * 1000)"); - put(POSTGRES, "round(extract(epoch from current_timestamp) * 1000)"); - put(ORACLE, "(cast(systimestamp at time zone 'UTC' as date) - date '1970-01-01')*24*60*60*1000 " + - "+ cast(mod( extract( second from systimestamp ), 1 ) * 1000 as int)"); - put(SQLSERVER, "datediff_big(millisecond, '19700101', sysutcdatetime())"); - }}; - - private static final EnumMap<DatabaseProduct, String> DB_SEED_FN = - new EnumMap<DatabaseProduct, String>(DatabaseProduct.class) {{ - put(DERBY, "ALTER TABLE \"TXNS\" ALTER \"TXN_ID\" RESTART WITH %s"); - put(MYSQL, "ALTER TABLE \"TXNS\" AUTO_INCREMENT = %s"); - put(POSTGRES, "ALTER SEQUENCE \"TXNS_TXN_ID_seq\" RESTART WITH %s"); - put(ORACLE, "ALTER TABLE \"TXNS\" MODIFY \"TXN_ID\" GENERATED BY DEFAULT AS IDENTITY (START WITH %s )"); - put(SQLSERVER, "DBCC CHECKIDENT ('txns', RESEED, %s )"); - }}; - private static int deadlockCnt = 0; private TxnDbUtil() { @@ -115,14 +93,14 @@ public final class TxnDbUtil { try { conn = getConnection(conf); String s = conn.getMetaData().getDatabaseProductName(); - DatabaseProduct dbProduct = DatabaseProduct.determineDatabaseProduct(s); + DatabaseProduct dbProduct = DatabaseProduct.determineDatabaseProduct(s, conf); stmt = conn.createStatement(); if (checkDbPrepared(stmt)) { return; } String schemaRootPath = getSchemaRootPath(); IMetaStoreSchemaInfo metaStoreSchemaInfo = - MetaStoreSchemaInfoFactory.get(conf, schemaRootPath, DatabaseProduct.getHiveSchemaPostfix(dbProduct)); + MetaStoreSchemaInfoFactory.get(conf, schemaRootPath, dbProduct.getHiveSchemaPostfix()); String initFile = metaStoreSchemaInfo.generateInitFileName(null); try (InputStream is = new FileInputStream( metaStoreSchemaInfo.getMetaStoreScriptDir() + File.separator + initFile)) { @@ -240,22 +218,22 @@ public final class TxnDbUtil { } // We want to try these, whether they succeed or fail. - success &= truncateTable(conn, stmt, "TXN_COMPONENTS"); - success &= truncateTable(conn, stmt, "COMPLETED_TXN_COMPONENTS"); - success &= truncateTable(conn, stmt, "TXNS"); - success &= truncateTable(conn, stmt, "TXN_TO_WRITE_ID"); - success &= truncateTable(conn, stmt, "NEXT_WRITE_ID"); - success &= truncateTable(conn, stmt, "HIVE_LOCKS"); - success &= truncateTable(conn, stmt, "NEXT_LOCK_ID"); - success &= truncateTable(conn, stmt, "COMPACTION_QUEUE"); - success &= truncateTable(conn, stmt, "NEXT_COMPACTION_QUEUE_ID"); - success &= truncateTable(conn, stmt, "COMPLETED_COMPACTIONS"); - success &= truncateTable(conn, stmt, "AUX_TABLE"); - success &= truncateTable(conn, stmt, "WRITE_SET"); - success &= truncateTable(conn, stmt, "REPL_TXN_MAP"); - success &= truncateTable(conn, stmt, "MATERIALIZATION_REBUILD_LOCKS"); + success &= truncateTable(conn, conf, stmt, "TXN_COMPONENTS"); + success &= truncateTable(conn, conf, stmt, "COMPLETED_TXN_COMPONENTS"); + success &= truncateTable(conn, conf, stmt, "TXNS"); + success &= truncateTable(conn, conf, stmt, "TXN_TO_WRITE_ID"); + success &= truncateTable(conn, conf, stmt, "NEXT_WRITE_ID"); + success &= truncateTable(conn, conf, stmt, "HIVE_LOCKS"); + success &= truncateTable(conn, conf, stmt, "NEXT_LOCK_ID"); + success &= truncateTable(conn, conf, stmt, "COMPACTION_QUEUE"); + success &= truncateTable(conn, conf, stmt, "NEXT_COMPACTION_QUEUE_ID"); + success &= truncateTable(conn, conf, stmt, "COMPLETED_COMPACTIONS"); + success &= truncateTable(conn, conf, stmt, "AUX_TABLE"); + success &= truncateTable(conn, conf, stmt, "WRITE_SET"); + success &= truncateTable(conn, conf, stmt, "REPL_TXN_MAP"); + success &= truncateTable(conn, conf, stmt, "MATERIALIZATION_REBUILD_LOCKS"); try { - resetTxnSequence(conn, stmt); + resetTxnSequence(conn, conf, stmt); stmt.executeUpdate("INSERT INTO \"NEXT_LOCK_ID\" VALUES(1)"); stmt.executeUpdate("INSERT INTO \"NEXT_COMPACTION_QUEUE_ID\" VALUES(1)"); } catch (SQLException e) { @@ -279,46 +257,14 @@ public final class TxnDbUtil { throw new RuntimeException("Failed to clean up txn tables"); } - private static void resetTxnSequence(Connection conn, Statement stmt) throws SQLException, MetaException{ + private static void resetTxnSequence(Connection conn, Configuration conf, + Statement stmt) throws SQLException, MetaException { String dbProduct = conn.getMetaData().getDatabaseProductName(); - DatabaseProduct databaseProduct = determineDatabaseProduct(dbProduct); - switch (databaseProduct) { - - case DERBY: - stmt.execute("ALTER TABLE \"TXNS\" ALTER \"TXN_ID\" RESTART WITH 1"); - stmt.execute("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\"," - + " \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\")" - + " VALUES(0, 'c', 0, 0, '', '')"); - break; - case MYSQL: - stmt.execute("ALTER TABLE \"TXNS\" AUTO_INCREMENT=1"); - stmt.execute("SET SQL_MODE='NO_AUTO_VALUE_ON_ZERO,ANSI_QUOTES'"); - stmt.execute("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\"," - + " \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\")" - + " VALUES(0, 'c', 0, 0, '', '')"); - break; - case POSTGRES: - stmt.execute("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\"," - + " \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\")" - + " VALUES(0, 'c', 0, 0, '', '')"); - stmt.execute("ALTER SEQUENCE \"TXNS_TXN_ID_seq\" RESTART"); - break; - case ORACLE: - stmt.execute("ALTER TABLE \"TXNS\" MODIFY \"TXN_ID\" GENERATED BY DEFAULT AS IDENTITY (START WITH 1)"); - stmt.execute("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\"," - + " \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\")" - + " VALUES(0, 'c', 0, 0, '_', '_')"); - break; - case SQLSERVER: - stmt.execute("DBCC CHECKIDENT ('txns', RESEED, 0)"); - stmt.execute("SET IDENTITY_INSERT TXNS ON"); - stmt.execute("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\"," - + " \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\")" - + " VALUES(0, 'c', 0, 0, '', '')"); - break; - case OTHER: - default: - break; + DatabaseProduct databaseProduct = determineDatabaseProduct(dbProduct, conf); + List<String> stmts = databaseProduct.getResetTxnSequenceStmts(); + + for (String s : stmts ) { + stmt.execute(s); } } @@ -330,22 +276,19 @@ public final class TxnDbUtil { * @param seedTxnId the seed value for the sequence * @throws SQLException ex */ - public static void seedTxnSequence(Connection conn, Statement stmt, long seedTxnId) throws SQLException { + public static void seedTxnSequence(Connection conn, Configuration conf, Statement stmt, long seedTxnId) throws SQLException { String dbProduct = conn.getMetaData().getDatabaseProductName(); - DatabaseProduct databaseProduct = determineDatabaseProduct(dbProduct); - stmt.execute(String.format(DB_SEED_FN.get(databaseProduct), seedTxnId)); + DatabaseProduct databaseProduct = determineDatabaseProduct(dbProduct, conf); + stmt.execute(databaseProduct.getTxnSeedFn(seedTxnId)); } - private static boolean truncateTable(Connection conn, Statement stmt, String name) { + private static boolean truncateTable(Connection conn, Configuration conf, Statement stmt, String name) { try { // We can not use actual truncate due to some foreign keys, but we don't expect much data during tests String dbProduct = conn.getMetaData().getDatabaseProductName(); - DatabaseProduct databaseProduct = determineDatabaseProduct(dbProduct); - if (databaseProduct == POSTGRES || databaseProduct == MYSQL) { - stmt.execute("DELETE FROM \"" + name + "\""); - } else { - stmt.execute("DELETE FROM " + name); - } + DatabaseProduct databaseProduct = determineDatabaseProduct(dbProduct, conf); + String s = databaseProduct.getTruncateStatement(name); + stmt.execute(s); LOG.debug("Successfully truncated table " + name); return true; @@ -493,14 +436,7 @@ public final class TxnDbUtil { * @throws MetaException For unknown database type. */ static String getEpochFn(DatabaseProduct dbProduct) throws MetaException { - String epochFn = DB_EPOCH_FN.get(dbProduct); - if (epochFn != null) { - return epochFn; - } else { - String msg = "Unknown database product: " + dbProduct.toString(); - LOG.error(msg); - throw new MetaException(msg); - } + return dbProduct.getMillisAfterEpochFn(); } /** @@ -515,7 +451,7 @@ public final class TxnDbUtil { * @throws SQLException Thrown if an execution error occurs. */ static void executeQueriesInBatchNoCount(DatabaseProduct dbProduct, Statement stmt, List<String> queries, int batchSize) throws SQLException { - if (dbProduct == ORACLE) { + if (dbProduct.isORACLE()) { int queryCounter = 0; StringBuilder sb = new StringBuilder(); sb.append("begin "); @@ -570,28 +506,4 @@ public final class TxnDbUtil { } return affectedRowsByQuery; } - - /** - + * Checks if the dbms supports the getGeneratedKeys for multiline insert statements. - + * @param dbProduct DBMS type - + * @return true if supports - + * @throws MetaException - + */ - public static boolean supportsGetGeneratedKeys(DatabaseProduct dbProduct) throws MetaException { - switch (dbProduct) { - case DERBY: - case SQLSERVER: - // The getGeneratedKeys is not supported for multi line insert - return false; - case ORACLE: - case MYSQL: - case POSTGRES: - return true; - case OTHER: - default: - String msg = "Unknown database product: " + dbProduct.toString(); - LOG.error(msg); - throw new MetaException(msg); - } - } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 2503876..0d6ea9d 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -151,7 +151,6 @@ import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.hadoop.hive.metastore.DatabaseProduct.MYSQL; import static org.apache.hadoop.hive.metastore.txn.TxnDbUtil.executeQueriesInBatch; import static org.apache.hadoop.hive.metastore.txn.TxnDbUtil.executeQueriesInBatchNoCount; import static org.apache.hadoop.hive.metastore.txn.TxnDbUtil.getEpochFn; @@ -634,7 +633,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { * If the getGeneratedKeys are not supported first we insert a random batchId in the TXN_META_INFO field, * then the keys are selected beck with that batchid. */ - boolean genKeySupport = TxnDbUtil.supportsGetGeneratedKeys(dbProduct); + boolean genKeySupport = dbProduct.supportsGetGeneratedKeys(); genKeySupport = genKeySupport || (numTxns == 1); String insertQuery = String.format(TXNS_INSERT_QRY, TxnDbUtil.getEpochFn(dbProduct), @@ -1036,8 +1035,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { try { stmt = dbConn.createStatement(); - if (sqlGenerator.getDbProduct() == MYSQL) { - stmt.execute("SET @@session.sql_mode=ANSI_QUOTES"); + String s = sqlGenerator.getDbProduct().getPrepareTxnStmt(); + if (s != null) { + stmt.execute(s); } String query = "select \"DB_ID\" from \"DBS\" where \"NAME\" = ? and \"CTLG_NAME\" = ?"; @@ -2167,7 +2167,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { throw new MetaException(MessageFormat .format("Invalid txnId seed {}, the highWaterMark is {}", rqst.getSeedTxnId(), highWaterMark)); } - TxnDbUtil.seedTxnSequence(dbConn, stmt, rqst.getSeedTxnId()); + TxnDbUtil.seedTxnSequence(dbConn, conf, stmt, rqst.getSeedTxnId()); dbConn.commit(); } catch (SQLException e) { @@ -4132,7 +4132,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { if(dbProduct == null) { throw new IllegalStateException("DB Type not determined yet."); } - if (DatabaseProduct.isDeadlock(dbProduct, e)) { + if (dbProduct.isDeadlock(e)) { if (deadlockCnt++ < ALLOWED_REPEATED_DEADLOCKS) { long waitInterval = deadlockRetryInterval * deadlockCnt; LOG.warn("Deadlock detected in " + caller + ". Will wait " + waitInterval + @@ -4181,27 +4181,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { Statement stmt = null; try { stmt = conn.createStatement(); - String s; - switch (dbProduct) { - case DERBY: - s = "values current_timestamp"; - break; - - case MYSQL: - case POSTGRES: - case SQLSERVER: - s = "select current_timestamp"; - break; - - case ORACLE: - s = "select current_timestamp from dual"; - break; + String s = dbProduct.getDBTime(); - default: - String msg = "Unknown database product: " + dbProduct.toString(); - LOG.error(msg); - throw new MetaException(msg); - } LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) throw new MetaException("No results from date query"); @@ -4216,27 +4197,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } protected String isWithinCheckInterval(String expr, long interval) throws MetaException { - String condition; - switch (dbProduct) { - case DERBY: - condition = " {fn TIMESTAMPDIFF(sql_tsi_second, " + expr + ", current_timestamp)} <= " + interval; - break; - case MYSQL: - case POSTGRES: - condition = expr + " >= current_timestamp - interval '" + interval + "' second"; - break; - case SQLSERVER: - condition = "DATEDIFF(second, " + expr + ", current_timestamp) <= " + interval; - break; - case ORACLE: - condition = expr + " >= current_timestamp - numtodsinterval(" + interval + " , 'second')"; - break; - default: - String msg = "Unknown database product: " + dbProduct.toString(); - LOG.error(msg); - throw new MetaException(msg); - } - return condition; + return dbProduct.isWithinCheckInterval(expr, interval); } /** @@ -4257,8 +4218,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { if (dbProduct != null) return; try { String s = conn.getMetaData().getDatabaseProductName(); - dbProduct = DatabaseProduct.determineDatabaseProduct(s); - if (dbProduct == DatabaseProduct.OTHER) { + dbProduct = DatabaseProduct.determineDatabaseProduct(s, conf); + if (dbProduct.isUNDEFINED()) { String msg = "Unrecognized database product name <" + s + ">"; LOG.error(msg); throw new IllegalStateException(msg); @@ -5173,42 +5134,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } return false; } + private boolean isDuplicateKeyError(SQLException ex) { - switch (dbProduct) { - case DERBY: - if("23505".equals(ex.getSQLState())) { - return true; - } - break; - case MYSQL: - //https://dev.mysql.com/doc/refman/5.5/en/error-messages-server.html - if((ex.getErrorCode() == 1022 || ex.getErrorCode() == 1062 || ex.getErrorCode() == 1586) - && "23000".equals(ex.getSQLState())) { - return true; - } - break; - case SQLSERVER: - //2627 is unique constaint violation incl PK, 2601 - unique key - if ((ex.getErrorCode() == 2627 || ex.getErrorCode() == 2601) && "23000".equals(ex.getSQLState())) { - return true; - } - break; - case ORACLE: - if(ex.getErrorCode() == 1 && "23000".equals(ex.getSQLState())) { - return true; - } - break; - case POSTGRES: - //http://www.postgresql.org/docs/8.1/static/errcodes-appendix.html - if("23505".equals(ex.getSQLState())) { - return true; - } - break; - default: - throw new IllegalArgumentException("Unexpected DB type: " + dbProduct + "; " + getMessage(ex)); - } - return false; + return dbProduct.isDuplicateKeyError(ex); } + private static String getMessage(SQLException ex) { return ex.getMessage() + " (SQLState=" + ex.getSQLState() + ", ErrorCode=" + ex.getErrorCode() + ")"; } @@ -5247,12 +5177,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { * with Derby database. See more notes at class level. */ private void lockInternal() { - if(dbProduct == DatabaseProduct.DERBY) { + if(dbProduct.isDERBY()) { derbyLock.lock(); } } private void unlockInternal() { - if(dbProduct == DatabaseProduct.DERBY) { + if(dbProduct.isDERBY()) { derbyLock.unlock(); } } @@ -5307,7 +5237,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } } Semaphore derbySemaphore = null; - if(dbProduct == DatabaseProduct.DERBY) { + if(dbProduct.isDERBY()) { derbyKey2Lock.putIfAbsent(key, new Semaphore(1)); derbySemaphore = derbyKey2Lock.get(key); derbySemaphore.acquire(); diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyCustomRDBMS.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyCustomRDBMS.java new file mode 100644 index 0000000..053fbe4 --- /dev/null +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyCustomRDBMS.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Dummy custom database product - companion class to enable testing by TestTxnUtils + */ +public class DummyCustomRDBMS extends DatabaseProduct { + static final private Logger LOG = LoggerFactory.getLogger(DummyCustomRDBMS.class.getName()); + + public DummyCustomRDBMS() { + LOG.info("Instantiating custom RDBMS"); + } + @Override + public boolean isDeadlock(SQLException e) { + return true; + } + @Override + public boolean needsInBatching() { + return true; + } + @Override + public boolean hasJoinOperationOrderBug() { + return true; + } + @Override + public String getHiveSchemaPostfix() { + return "DummyPostfix"; + } + @Override + protected String toDate(String tableValue) { + return "DummyDate"; + } + @Override + public String getPrepareTxnStmt() { + return "DummyPrepare"; + } + @Override + public String getMillisAfterEpochFn() { + return "DummyFn"; + } + @Override + public String getDBTime() throws MetaException { + return super.getDBTime(); + } + @Override + public String isWithinCheckInterval(String expr, long intervalInSeconds) { + return "DummyIsWithin"; + } + @Override + public String addForUpdateClause(String selectStatement) { + return selectStatement + " for update"; + } + @Override + public String addLimitClause(int numRows, String noSelectsqlQuery) { + return "limit " + numRows; + } + @Override + public String lockTable(String txnLockTable, boolean shared) { + return "DummyLock"; + } + @Override + public List<String> getResetTxnSequenceStmts() { + return Arrays.asList(new String[]{"DummyStmt"}); + } + @Override + public String getTruncateStatement(String name) { + return super.getTruncateStatement(name); + } + @Override + public boolean supportsGetGeneratedKeys() { + return true; + } + @Override + public boolean isDuplicateKeyError(SQLException ex) { + return true; + } + @Override + public List<String> createInsertValuesStmt(String tblColumns, List<String> rows, + List<Integer> rowsCountInStmts, Configuration conf) { + return Arrays.asList(new String[]{"DummyStmt"}); + } + @Override + public String addEscapeCharacters(String s) { + return s; + } + @Override + public Map<String, String> getDataSourceProperties() { + return null; + } + @Override + public Configuration getConf() { + myConf.set("DummyKey", "DummyValue"); + return myConf; + } +} diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java index 60be0f9..fcd6699 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.metastore.txn; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.DatabaseProduct; +import org.apache.hadoop.hive.metastore.DummyCustomRDBMS; import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest; import org.apache.hadoop.hive.metastore.tools.SQLGenerator; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; @@ -176,8 +177,10 @@ public class TestTxnUtils { @Test public void testSQLGenerator() throws Exception { //teseted on Oracle Database 11g Express Edition Release 11.2.0.2.0 - 64bit Production + + DatabaseProduct.reset(); SQLGenerator sqlGenerator = - new SQLGenerator(DatabaseProduct.ORACLE, conf); + new SQLGenerator(DatabaseProduct.determineDatabaseProduct(DatabaseProduct.ORACLE_NAME, conf), conf); List<String> rows = new ArrayList<>(); rows.add("'yellow', 1"); List<String> sql = sqlGenerator.createInsertValuesStmt("colors(name, category)", rows); @@ -198,8 +201,9 @@ public class TestTxnUtils { Assert.assertEquals("Wrong stmt", "insert all into colors(name, category) values('yellow', 1) into colors(name, category) values('red', 2) into colors(name, category) values('orange', 3) into colors(name, category) values('G',0) into colors(name, category) values('G',1) into colors(name, category) values('G',2) into colors(name, category) values('G',3) into colors(name, category) values('G',4) into colors(name, category) values('G',5) into colors(name, category) values('G',6) into co [...] Assert.assertEquals("Wrong stmt", "insert all into colors(name, category) values('G',997) into colors(name, category) values('G',998) into colors(name, category) values('G',999) select * from dual", sql.get(1)); + DatabaseProduct.reset(); sqlGenerator = - new SQLGenerator(DatabaseProduct.MYSQL, conf); + new SQLGenerator(DatabaseProduct.determineDatabaseProduct(DatabaseProduct.MYSQL_NAME, conf), conf); rows.clear(); rows.add("'yellow', 1"); sql = sqlGenerator.createInsertValuesStmt("colors(name, category)", rows); @@ -218,22 +222,42 @@ public class TestTxnUtils { Assert.assertEquals("Wrong stmt", "insert into colors(name, category) values('yellow', 1),('red', 2),('orange', 3),('G',0),('G',1),('G',2),('G',3),('G',4),('G',5),('G',6),('G',7),('G',8),('G',9),('G',10),('G',11),('G',12),('G',13),('G',14),('G',15),('G',16),('G',17),('G',18),('G',19),('G',20),('G',21),('G',22),('G',23),('G',24),('G',25),('G',26),('G',27),('G',28),('G',29),('G',30),('G',31),('G',32),('G',33),('G',34),('G',35),('G',36),('G',37),('G',38),('G',39),('G',40),('G',41),('G', [...] Assert.assertEquals("Wrong stmt", "insert into colors(name, category) values('G',997),('G',998),('G',999)", sql.get(1)); - sqlGenerator = new SQLGenerator(DatabaseProduct.SQLSERVER, conf); + DatabaseProduct.reset(); + sqlGenerator = new SQLGenerator(DatabaseProduct.determineDatabaseProduct(DatabaseProduct.SQL_SERVER_NAME, conf), conf); String modSql = sqlGenerator.addForUpdateClause("select nl_next from NEXT_LOCK_ID"); Assert.assertEquals("select nl_next from NEXT_LOCK_ID with (updlock)", modSql); modSql = sqlGenerator.addForUpdateClause("select MT_COMMENT from AUX_TABLE where MT_KEY1='CheckLock' and MT_KEY2=0"); Assert.assertEquals("select MT_COMMENT from AUX_TABLE with (updlock) where MT_KEY1='CheckLock' and MT_KEY2=0", modSql); } + @Test + public void testCustomRDBMS() throws Exception { + MetastoreConf.setBoolVar(conf, ConfVars.USE_CUSTOM_RDBMS, true); + MetastoreConf.setVar(conf, ConfVars.CUSTOM_RDBMS_CLASSNAME, DummyCustomRDBMS.class.getName()); + DatabaseProduct.reset(); + DatabaseProduct db = DatabaseProduct.determineDatabaseProduct(DatabaseProduct.UNDEFINED_NAME, conf); + + Assert.assertEquals(db.getHiveSchemaPostfix(), "DummyPostfix"); + Assert.assertEquals(db.getDBTime(), "values current_timestamp"); + + Configuration c = db.getConf(); + Assert.assertEquals(c.get("DummyKey"), "DummyValue"); + //Cleanup conf + MetastoreConf.setBoolVar(conf, ConfVars.USE_CUSTOM_RDBMS, false); + MetastoreConf.setVar(conf, ConfVars.CUSTOM_RDBMS_CLASSNAME, ""); + } + @Before public void setUp() throws Exception { conf = MetastoreConf.newMetastoreConf(); TxnDbUtil.setConfValues(conf); + DatabaseProduct.reset(); TxnDbUtil.prepDb(conf); } @After public void tearDown() throws Exception { + DatabaseProduct.reset(); TxnDbUtil.cleanDb(conf); } }