HIVE-16886: HMS log notifications may have duplicated event IDs if multiple HMS are running concurrently (Anishek Agarwal, reviewed by Sergio Peña, Daniel Dai)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/92f9d8fb Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/92f9d8fb Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/92f9d8fb Branch: refs/heads/hive-14535 Commit: 92f9d8fb4de7896cc9161bfbd461fac4f2917a35 Parents: 076bd77 Author: Daniel Dai <da...@hortonworks.com> Authored: Mon Sep 11 01:14:03 2017 -0700 Committer: Daniel Dai <da...@hortonworks.com> Committed: Mon Sep 11 01:14:03 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 7 + .../listener/DbNotificationListener.java | 14 +- .../upgrade/derby/045-HIVE-16886.derby.sql | 1 + .../upgrade/derby/hive-schema-3.0.0.derby.sql | 6 + .../derby/upgrade-2.3.0-to-3.0.0.derby.sql | 1 + .../upgrade/mssql/030-HIVE-16886.mssql.sql | 1 + .../upgrade/mssql/hive-schema-3.0.0.mssql.sql | 2 + .../mssql/upgrade-2.3.0-to-3.0.0.mssql.sql | 1 + .../upgrade/mysql/045-HIVE-16886.mysql.sql | 1 + .../upgrade/mysql/hive-schema-3.0.0.mysql.sql | 2 + .../mysql/upgrade-2.3.0-to-3.0.0.mysql.sql | 1 + .../upgrade/oracle/045-HIVE-16886.oracle.sql | 1 + .../upgrade/oracle/hive-schema-3.0.0.oracle.sql | 1 + .../oracle/upgrade-2.3.0-to-3.0.0.oracle.sql | 1 + .../postgres/044-HIVE-16886.postgres.sql | 1 + .../postgres/hive-schema-3.0.0.postgres.sql | 2 + .../upgrade-2.3.0-to-3.0.0.postgres.sql | 1 + .../hive/metastore/MetaStoreDirectSql.java | 17 +- .../hadoop/hive/metastore/ObjectStore.java | 102 +++++++++-- .../hive/metastore/tools/SQLGenerator.java | 172 +++++++++++++++++++ .../hadoop/hive/metastore/txn/TxnHandler.java | 134 +-------------- .../hadoop/hive/metastore/TestObjectStore.java | 163 ++++++++++++++---- .../hadoop/hive/metastore/txn/TestTxnUtils.java | 9 +- 23 files changed, 449 insertions(+), 192 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/92f9d8fb/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index cf3f50b..c9d75c0 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -779,6 +779,13 @@ public class HiveConf extends Configuration { METASTORE_TRANSACTIONAL_EVENT_LISTENERS("hive.metastore.transactional.event.listeners", "", "A comma separated list of Java classes that implement the org.apache.hadoop.hive.metastore.MetaStoreEventListener" + " interface. Both the metastore event and corresponding listener method will be invoked in the same JDO transaction."), + NOTIFICATION_SEQUENCE_LOCK_MAX_RETRIES("hive.notification.sequence.lock.max.retries", 5, + "Number of retries required to acquire a lock when getting the next notification sequential ID for entries " + + "in the NOTIFICATION_LOG table."), + NOTIFICATION_SEQUENCE_LOCK_RETRY_SLEEP_INTERVAL("hive.notification.sequence.lock.retry.sleep.interval", 500, + new TimeValidator(TimeUnit.MILLISECONDS), + "Sleep interval between retries to acquire a notification lock as described part of property " + + NOTIFICATION_SEQUENCE_LOCK_MAX_RETRIES.name()), METASTORE_EVENT_DB_LISTENER_TTL("hive.metastore.event.db.listener.timetolive", "86400s", new TimeValidator(TimeUnit.SECONDS), "time after which events will be removed from the database listener queue"), http://git-wip-us.apache.org/repos/asf/hive/blob/92f9d8fb/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 68d3cc1..f807055 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -92,8 +92,6 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener private static final Logger LOG = LoggerFactory.getLogger(DbNotificationListener.class.getName()); private static CleanerThread cleaner = null; - private static final Object NOTIFICATION_TBL_LOCK = new Object(); - // This is the same object as super.conf, but it's convenient to keep a copy of it as a // HiveConf rather than a Configuration. private HiveConf hiveConf; @@ -573,11 +571,9 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener */ private void process(NotificationEvent event, ListenerEvent listenerEvent) throws MetaException { event.setMessageFormat(msgFactory.getMessageFormat()); - synchronized (NOTIFICATION_TBL_LOCK) { - LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(), - event.getMessage()); - HMSHandler.getMSForConf(hiveConf).addNotificationEvent(event); - } + LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(), + event.getMessage()); + HMSHandler.getMSForConf(hiveConf).addNotificationEvent(event); // Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners. if (event.isSetEventId()) { @@ -603,9 +599,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener @Override public void run() { while (true) { - synchronized(NOTIFICATION_TBL_LOCK) { - rs.cleanNotificationEvents(ttl); - } + rs.cleanNotificationEvents(ttl); LOG.debug("Cleaner thread done"); try { Thread.sleep(sleepTime); http://git-wip-us.apache.org/repos/asf/hive/blob/92f9d8fb/metastore/scripts/upgrade/derby/045-HIVE-16886.derby.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/derby/045-HIVE-16886.derby.sql b/metastore/scripts/upgrade/derby/045-HIVE-16886.derby.sql new file mode 100644 index 0000000..d6a1ce8 --- /dev/null +++ b/metastore/scripts/upgrade/derby/045-HIVE-16886.derby.sql @@ -0,0 +1 @@ +INSERT INTO "APP"."NOTIFICATION_SEQUENCE" ("NNI_ID", "NEXT_EVENT_ID") SELECT * FROM (VALUES (1,1)) tmp_table WHERE NOT EXISTS ( SELECT "NEXT_EVENT_ID" FROM "APP"."NOTIFICATION_SEQUENCE"); \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/92f9d8fb/metastore/scripts/upgrade/derby/hive-schema-3.0.0.derby.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/derby/hive-schema-3.0.0.derby.sql b/metastore/scripts/upgrade/derby/hive-schema-3.0.0.derby.sql index f4cbba6..7b8bef1 100644 --- a/metastore/scripts/upgrade/derby/hive-schema-3.0.0.derby.sql +++ b/metastore/scripts/upgrade/derby/hive-schema-3.0.0.derby.sql @@ -111,6 +111,12 @@ CREATE TABLE "APP"."KEY_CONSTRAINTS" ("CHILD_CD_ID" BIGINT, "CHILD_INTEGER_IDX" CREATE TABLE "APP"."METASTORE_DB_PROPERTIES" ("PROPERTY_KEY" VARCHAR(255) NOT NULL, "PROPERTY_VALUE" VARCHAR(1000) NOT NULL, "DESCRIPTION" VARCHAR(1000)); -- ---------------------------------------------- +-- DML Statements +-- ---------------------------------------------- + +INSERT INTO "APP"."NOTIFICATION_SEQUENCE" ("NNI_ID", "NEXT_EVENT_ID") SELECT * FROM (VALUES (1,1)) tmp_table WHERE NOT EXISTS ( SELECT "NEXT_EVENT_ID" FROM "APP"."NOTIFICATION_SEQUENCE"); + +-- ---------------------------------------------- -- DDL Statements for indexes -- ---------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/92f9d8fb/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql b/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql index 01b6f90..756c9c1 100644 --- a/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql +++ b/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql @@ -3,5 +3,6 @@ RUN '041-HIVE-16556.derby.sql'; RUN '042-HIVE-16575.derby.sql'; RUN '043-HIVE-16922.derby.sql'; RUN '044-HIVE-16997.derby.sql'; +RUN '045-HIVE-16886.derby.sql'; UPDATE "APP".VERSION SET SCHEMA_VERSION='3.0.0', VERSION_COMMENT='Hive release version 3.0.0' where VER_ID=1; http://git-wip-us.apache.org/repos/asf/hive/blob/92f9d8fb/metastore/scripts/upgrade/mssql/030-HIVE-16886.mssql.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/mssql/030-HIVE-16886.mssql.sql b/metastore/scripts/upgrade/mssql/030-HIVE-16886.mssql.sql new file mode 100644 index 0000000..0e91a05 --- /dev/null +++ b/metastore/scripts/upgrade/mssql/030-HIVE-16886.mssql.sql @@ -0,0 +1 @@ +INSERT INTO NOTIFICATION_SEQUENCE (NNI_ID, NEXT_EVENT_ID) SELECT 1,1 WHERE NOT EXISTS (SELECT NEXT_EVENT_ID FROM NOTIFICATION_SEQUENCE); \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/92f9d8fb/metastore/scripts/upgrade/mssql/hive-schema-3.0.0.mssql.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/mssql/hive-schema-3.0.0.mssql.sql b/metastore/scripts/upgrade/mssql/hive-schema-3.0.0.mssql.sql index 158fa5a..3c0169d 100644 --- a/metastore/scripts/upgrade/mssql/hive-schema-3.0.0.mssql.sql +++ b/metastore/scripts/upgrade/mssql/hive-schema-3.0.0.mssql.sql @@ -595,6 +595,8 @@ CREATE TABLE NOTIFICATION_SEQUENCE ALTER TABLE NOTIFICATION_SEQUENCE ADD CONSTRAINT NOTIFICATION_SEQUENCE_PK PRIMARY KEY (NNI_ID); +INSERT INTO NOTIFICATION_SEQUENCE (NNI_ID, NEXT_EVENT_ID) SELECT 1,1 WHERE NOT EXISTS (SELECT NEXT_EVENT_ID FROM NOTIFICATION_SEQUENCE); + -- Constraints for table MASTER_KEYS for class(es) [org.apache.hadoop.hive.metastore.model.MMasterKey] -- Constraints for table IDXS for class(es) [org.apache.hadoop.hive.metastore.model.MIndex] http://git-wip-us.apache.org/repos/asf/hive/blob/92f9d8fb/metastore/scripts/upgrade/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql b/metastore/scripts/upgrade/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql index 21d62ae..cca8426 100644 --- a/metastore/scripts/upgrade/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql +++ b/metastore/scripts/upgrade/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql @@ -4,6 +4,7 @@ SELECT 'Upgrading MetaStore schema from 2.3.0 to 3.0.0' AS MESSAGE; :r 027-HIVE-16575.mssql.sql :r 028-HIVE-16922.mssql.sql :r 029-HIVE-16997.mssql.sql +:r 030-HIVE-16886.mssql.sql UPDATE VERSION SET SCHEMA_VERSION='3.0.0', VERSION_COMMENT='Hive release version 3.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 2.3.0 to 3.0.0' AS MESSAGE; http://git-wip-us.apache.org/repos/asf/hive/blob/92f9d8fb/metastore/scripts/upgrade/mysql/045-HIVE-16886.mysql.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/mysql/045-HIVE-16886.mysql.sql b/metastore/scripts/upgrade/mysql/045-HIVE-16886.mysql.sql new file mode 100644 index 0000000..d4d798d --- /dev/null +++ b/metastore/scripts/upgrade/mysql/045-HIVE-16886.mysql.sql @@ -0,0 +1 @@ +INSERT INTO `NOTIFICATION_SEQUENCE` (`NNI_ID`, `NEXT_EVENT_ID`) SELECT 1,1 WHERE NOT EXISTS ( SELECT `NEXT_EVENT_ID` FROM `NOTIFICATION_SEQUENCE`); \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/92f9d8fb/metastore/scripts/upgrade/mysql/hive-schema-3.0.0.mysql.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/mysql/hive-schema-3.0.0.mysql.sql b/metastore/scripts/upgrade/mysql/hive-schema-3.0.0.mysql.sql index 98c1077..6091801 100644 --- a/metastore/scripts/upgrade/mysql/hive-schema-3.0.0.mysql.sql +++ b/metastore/scripts/upgrade/mysql/hive-schema-3.0.0.mysql.sql @@ -812,6 +812,8 @@ CREATE TABLE IF NOT EXISTS `NOTIFICATION_SEQUENCE` PRIMARY KEY (`NNI_ID`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1; +INSERT INTO `NOTIFICATION_SEQUENCE` (`NNI_ID`, `NEXT_EVENT_ID`) SELECT 1,1 WHERE NOT EXISTS ( SELECT `NEXT_EVENT_ID` FROM `NOTIFICATION_SEQUENCE`); + CREATE TABLE IF NOT EXISTS `KEY_CONSTRAINTS` ( `CHILD_CD_ID` BIGINT, http://git-wip-us.apache.org/repos/asf/hive/blob/92f9d8fb/metastore/scripts/upgrade/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql b/metastore/scripts/upgrade/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql index 9cd3a62..120a1f8 100644 --- a/metastore/scripts/upgrade/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql +++ b/metastore/scripts/upgrade/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql @@ -4,6 +4,7 @@ SOURCE 041-HIVE-16556.mysql.sql; SOURCE 042-HIVE-16575.mysql.sql; SOURCE 043-HIVE-16922.mysql.sql; SOURCE 044-HIVE-16997.mysql.sql; +SOURCE 045-HIVE-16886.mysql.sql; UPDATE VERSION SET SCHEMA_VERSION='3.0.0', VERSION_COMMENT='Hive release version 3.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 2.3.0 to 3.0.0' AS ' '; http://git-wip-us.apache.org/repos/asf/hive/blob/92f9d8fb/metastore/scripts/upgrade/oracle/045-HIVE-16886.oracle.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/oracle/045-HIVE-16886.oracle.sql b/metastore/scripts/upgrade/oracle/045-HIVE-16886.oracle.sql new file mode 100644 index 0000000..9bc9dd5 --- /dev/null +++ b/metastore/scripts/upgrade/oracle/045-HIVE-16886.oracle.sql @@ -0,0 +1 @@ +INSERT INTO NOTIFICATION_SEQUENCE (NNI_ID, NEXT_EVENT_ID) SELECT 1,1 FROM DUAL WHERE NOT EXISTS ( SELECT NEXT_EVENT_ID FROM NOTIFICATION_SEQUENCE); \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/92f9d8fb/metastore/scripts/upgrade/oracle/hive-schema-3.0.0.oracle.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/oracle/hive-schema-3.0.0.oracle.sql b/metastore/scripts/upgrade/oracle/hive-schema-3.0.0.oracle.sql index 7b32d78..79b9efb 100644 --- a/metastore/scripts/upgrade/oracle/hive-schema-3.0.0.oracle.sql +++ b/metastore/scripts/upgrade/oracle/hive-schema-3.0.0.oracle.sql @@ -576,6 +576,7 @@ CREATE TABLE NOTIFICATION_SEQUENCE ALTER TABLE NOTIFICATION_SEQUENCE ADD CONSTRAINT NOTIFICATION_SEQUENCE_PK PRIMARY KEY (NNI_ID); +INSERT INTO NOTIFICATION_SEQUENCE (NNI_ID, NEXT_EVENT_ID) SELECT 1,1 FROM DUAL WHERE NOT EXISTS ( SELECT NEXT_EVENT_ID FROM NOTIFICATION_SEQUENCE); -- Constraints for table PART_COL_PRIVS for class(es) [org.apache.hadoop.hive.metastore.model.MPartitionColumnPrivilege] http://git-wip-us.apache.org/repos/asf/hive/blob/92f9d8fb/metastore/scripts/upgrade/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql b/metastore/scripts/upgrade/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql index 6a26649..9b8b162 100644 --- a/metastore/scripts/upgrade/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql +++ b/metastore/scripts/upgrade/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql @@ -4,6 +4,7 @@ SELECT 'Upgrading MetaStore schema from 2.3.0 to 3.0.0' AS Status from dual; @042-HIVE-16575.oracle.sql; @043-HIVE-16922.oracle.sql; @044-HIVE-16997.oracle.sql; +@045-HIVE-16886.oracle.sql; UPDATE VERSION SET SCHEMA_VERSION='3.0.0', VERSION_COMMENT='Hive release version 3.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 2.3.0 to 3.0.0' AS Status from dual; http://git-wip-us.apache.org/repos/asf/hive/blob/92f9d8fb/metastore/scripts/upgrade/postgres/044-HIVE-16886.postgres.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/postgres/044-HIVE-16886.postgres.sql b/metastore/scripts/upgrade/postgres/044-HIVE-16886.postgres.sql new file mode 100644 index 0000000..1c5aea8 --- /dev/null +++ b/metastore/scripts/upgrade/postgres/044-HIVE-16886.postgres.sql @@ -0,0 +1 @@ +INSERT INTO "NOTIFICATION_SEQUENCE" ("NNI_ID", "NEXT_EVENT_ID") SELECT 1,1 WHERE NOT EXISTS ( SELECT "NEXT_EVENT_ID" FROM "NOTIFICATION_SEQUENCE"); \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/92f9d8fb/metastore/scripts/upgrade/postgres/hive-schema-3.0.0.postgres.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/postgres/hive-schema-3.0.0.postgres.sql b/metastore/scripts/upgrade/postgres/hive-schema-3.0.0.postgres.sql index d2dfdd8..92a2d38 100644 --- a/metastore/scripts/upgrade/postgres/hive-schema-3.0.0.postgres.sql +++ b/metastore/scripts/upgrade/postgres/hive-schema-3.0.0.postgres.sql @@ -589,6 +589,8 @@ CREATE TABLE "NOTIFICATION_SEQUENCE" PRIMARY KEY ("NNI_ID") ); +INSERT INTO "NOTIFICATION_SEQUENCE" ("NNI_ID", "NEXT_EVENT_ID") SELECT 1,1 WHERE NOT EXISTS ( SELECT "NEXT_EVENT_ID" FROM "NOTIFICATION_SEQUENCE"); + CREATE TABLE "KEY_CONSTRAINTS" ( "CHILD_CD_ID" BIGINT, http://git-wip-us.apache.org/repos/asf/hive/blob/92f9d8fb/metastore/scripts/upgrade/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql b/metastore/scripts/upgrade/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql index ee5a673..c4d3d5f 100644 --- a/metastore/scripts/upgrade/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql +++ b/metastore/scripts/upgrade/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql @@ -4,6 +4,7 @@ SELECT 'Upgrading MetaStore schema from 2.3.0 to 3.0.0'; \i 041-HIVE-16575.postgres.sql; \i 042-HIVE-16922.postgres.sql; \i 043-HIVE-16997.postgres.sql; +\i 044-HIVE-16886.postgres.sql; UPDATE "VERSION" SET "SCHEMA_VERSION"='3.0.0', "VERSION_COMMENT"='Hive release version 3.0.0' where "VER_ID"=1; SELECT 'Finished upgrading MetaStore schema from 2.3.0 to 3.0.0'; http://git-wip-us.apache.org/repos/asf/hive/blob/92f9d8fb/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index b122c43..dc1245e 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -67,6 +67,8 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.model.MConstraint; import org.apache.hadoop.hive.metastore.model.MDatabase; +import org.apache.hadoop.hive.metastore.model.MNotificationLog; +import org.apache.hadoop.hive.metastore.model.MNotificationNextId; import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics; import org.apache.hadoop.hive.metastore.model.MTableColumnStatistics; import org.apache.hadoop.hive.metastore.parser.ExpressionTree; @@ -136,7 +138,7 @@ class MetaStoreDirectSql { this.schema = schema; DatabaseProduct dbType = null; try { - dbType = DatabaseProduct.determineDatabaseProduct(getProductName()); + dbType = DatabaseProduct.determineDatabaseProduct(getProductName(pm)); } catch (SQLException e) { LOG.warn("Cannot determine database product; assuming OTHER", e); dbType = DatabaseProduct.OTHER; @@ -190,7 +192,7 @@ class MetaStoreDirectSql { this(pm, conf, ""); } - private String getProductName() { + static String getProductName(PersistenceManager pm) { JDOConnection jdoConn = pm.getDataStoreConnection(); try { return ((Connection)jdoConn.getNativeConnection()).getMetaData().getDatabaseProductName(); @@ -222,6 +224,17 @@ class MetaStoreDirectSql { partColumnQuery = pm.newQuery(MPartitionColumnStatistics.class, "dbName == ''"); partColumnQuery.execute(); + /* + these queries for the notification related tables have to be executed so + that the tables are created. This was not required earlier because we were + interacting with these tables via DataNucleus so it would create them if + they did not exist (mostly used in test, schematool should be used for production). + however this has been changed and we used direct SQL + queries via DataNucleus to interact with them now. + */ + pm.newQuery(MNotificationLog.class, "dbName == ''").execute(); + pm.newQuery(MNotificationNextId.class, "nextEventId < -1").execute(); + return true; } catch (Exception ex) { doCommit = false; http://git-wip-us.apache.org/repos/asf/hive/blob/92f9d8fb/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index b01e59e..3053dcb 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -163,6 +163,9 @@ import org.apache.hadoop.hive.metastore.model.MMetastoreDBProperties; import org.apache.hadoop.hive.metastore.parser.ExpressionTree; import org.apache.hadoop.hive.metastore.parser.ExpressionTree.FilterBuilder; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; +import org.apache.hadoop.hive.metastore.tools.SQLGenerator; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.HiveStringUtils; import org.apache.thrift.TException; @@ -239,6 +242,7 @@ public class ObjectStore implements RawStore, Configurable { private boolean isInitialized = false; private PersistenceManager pm = null; + private SQLGenerator sqlGenerator = null; private MetaStoreDirectSql directSql = null; private PartitionExpressionProxy expressionProxy = null; private Configuration hiveConf; @@ -427,6 +431,15 @@ public class ObjectStore implements RawStore, Configurable { LOG.info("ObjectStore, initialize called"); prop = dsProps; pm = getPersistenceManager(); + try { + String productName = MetaStoreDirectSql.getProductName(pm); + sqlGenerator = new SQLGenerator( + DatabaseProduct.determineDatabaseProduct(productName), + new HiveConf(hiveConf, ObjectStore.class)); + } catch (SQLException e) { + LOG.error("error trying to figure out the database product", e); + throw new RuntimeException(e); + } isInitialized = pm != null; if (isInitialized) { expressionProxy = createExpressionProxy(hiveConf); @@ -8238,7 +8251,7 @@ public class ObjectStore implements RawStore, Configurable { Query query = null; NotificationEventResponse result = new NotificationEventResponse(); - result.setEvents(new ArrayList<NotificationEvent>()); + result.setEvents(new ArrayList<>()); try { openTransaction(); long lastEvent = rqst.getLastEvent(); @@ -8265,29 +8278,98 @@ public class ObjectStore implements RawStore, Configurable { } } + private void lockForUpdate() throws MetaException { + String selectQuery = "select NEXT_EVENT_ID from NOTIFICATION_SEQUENCE"; + String selectForUpdateQuery = sqlGenerator.addForUpdateClause(selectQuery); + new RetryingExecutor(hiveConf, () -> { + Query query = pm.newQuery("javax.jdo.query.SQL", selectForUpdateQuery); + query.setUnique(true); + // only need to execute it to get db Lock + query.execute(); + }).run(); + } + + static class RetryingExecutor { + interface Command { + void process() throws Exception; + } + + private static Logger LOG = LoggerFactory.getLogger(RetryingExecutor.class); + private final int maxRetries; + private final long sleepInterval; + private int currentRetries = 0; + private final Command command; + + RetryingExecutor(Configuration config, Command command) { + this.maxRetries = config.getInt(ConfVars.NOTIFICATION_SEQUENCE_LOCK_MAX_RETRIES.name(), + ConfVars.NOTIFICATION_SEQUENCE_LOCK_MAX_RETRIES.defaultIntVal); + this.sleepInterval = config.getTimeDuration( + ConfVars.NOTIFICATION_SEQUENCE_LOCK_RETRY_SLEEP_INTERVAL.name(), + ConfVars.NOTIFICATION_SEQUENCE_LOCK_RETRY_SLEEP_INTERVAL.defaultLongVal, + TimeUnit.MILLISECONDS + ); + this.command = command; + } + + public void run() throws MetaException { + while (true) { + try { + command.process(); + break; + } catch (Exception e) { + LOG.info( + "Attempting to acquire the DB log notification lock: " + currentRetries + " out of " + + maxRetries + " retries", e); + if (currentRetries >= maxRetries) { + String message = + "Couldn't acquire the DB log notification lock because we reached the maximum" + + " # of retries: {} retries. If this happens too often, then is recommended to " + + "increase the maximum number of retries on the" + + " hive.notification.sequence.lock.max.retries configuration"; + LOG.error(message, e); + throw new MetaException(message + " :: " + e.getMessage()); + } + currentRetries++; + try { + Thread.sleep(sleepInterval); + } catch (InterruptedException e1) { + String msg = "Couldn't acquire the DB notification log lock on " + currentRetries + + " retry, because the following error: "; + LOG.error(msg, e1); + throw new MetaException(msg + e1.getMessage()); + } + } + } + } + } + @Override public void addNotificationEvent(NotificationEvent entry) { boolean commited = false; Query query = null; try { openTransaction(); - query = pm.newQuery(MNotificationNextId.class); - Collection<MNotificationNextId> ids = (Collection) query.execute(); - MNotificationNextId id = null; + lockForUpdate(); + Query objectQuery = pm.newQuery(MNotificationNextId.class); + Collection<MNotificationNextId> ids = (Collection) objectQuery.execute(); + MNotificationNextId mNotificationNextId = null; boolean needToPersistId; if (ids == null || ids.size() == 0) { - id = new MNotificationNextId(1L); + mNotificationNextId = new MNotificationNextId(1L); needToPersistId = true; } else { - id = ids.iterator().next(); + mNotificationNextId = ids.iterator().next(); needToPersistId = false; } - entry.setEventId(id.getNextEventId()); - id.incrementEventId(); - if (needToPersistId) - pm.makePersistent(id); + entry.setEventId(mNotificationNextId.getNextEventId()); + mNotificationNextId.incrementEventId(); + if (needToPersistId) { + pm.makePersistent(mNotificationNextId); + } pm.makePersistent(translateThriftToDb(entry)); commited = commitTransaction(); + } catch (Exception e) { + LOG.error("couldnot get lock for update", e); } finally { rollbackAndCleanup(commited, query); } http://git-wip-us.apache.org/repos/asf/hive/blob/92f9d8fb/metastore/src/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java new file mode 100644 index 0000000..0c0bfef --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java @@ -0,0 +1,172 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.metastore.tools; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.DatabaseProduct; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Helper class that generates SQL queries with syntax specific to target DB + * todo: why throw MetaException? + */ +@VisibleForTesting +public final class SQLGenerator { + static final private Logger LOG = LoggerFactory.getLogger(SQLGenerator.class.getName()); + private final DatabaseProduct dbProduct; + private final HiveConf conf; + + public SQLGenerator(DatabaseProduct dbProduct, HiveConf conf) { + this.dbProduct = dbProduct; + this.conf = conf; + } + + /** + * Genereates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB + * + * @param tblColumns e.g. "T(a,b,c)" + * @param rows e.g. list of Strings like 3,4,'d' + * @return fully formed INSERT INTO ... statements + */ + public List<String> createInsertValuesStmt(String tblColumns, List<String> rows) { + if (rows == null || rows.size() == 0) { + return Collections.emptyList(); + } + List<String> insertStmts = new ArrayList<>(); + StringBuilder sb = new StringBuilder(); + switch (dbProduct) { + case ORACLE: + if (rows.size() > 1) { + //http://www.oratable.com/oracle-insert-all/ + //https://livesql.oracle.com/apex/livesql/file/content_BM1LJQ87M5CNIOKPOWPV6ZGR3.html + for (int numRows = 0; numRows < rows.size(); numRows++) { + if (numRows % conf + .getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE) == 0) { + if (numRows > 0) { + sb.append(" select * from dual"); + insertStmts.add(sb.toString()); + } + sb.setLength(0); + sb.append("insert all "); + } + sb.append("into ").append(tblColumns).append(" values(").append(rows.get(numRows)) + .append(") "); + } + sb.append("select * from dual"); + insertStmts.add(sb.toString()); + return insertStmts; + } + //fall through + case DERBY: + case MYSQL: + case POSTGRES: + case SQLSERVER: + for (int numRows = 0; numRows < rows.size(); numRows++) { + if (numRows % conf + .getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE) == 0) { + if (numRows > 0) { + insertStmts.add(sb.substring(0, sb.length() - 1));//exclude trailing comma + } + sb.setLength(0); + sb.append("insert into ").append(tblColumns).append(" values"); + } + sb.append('(').append(rows.get(numRows)).append("),"); + } + insertStmts.add(sb.substring(0, sb.length() - 1));//exclude trailing comma + return insertStmts; + default: + String msg = "Unrecognized database product name <" + dbProduct + ">"; + LOG.error(msg); + throw new IllegalStateException(msg); + } + } + + /** + * Given a {@code selectStatement}, decorated it with FOR UPDATE or semantically equivalent + * construct. If the DB doesn't support, return original select. + */ + public String addForUpdateClause(String selectStatement) throws MetaException { + switch (dbProduct) { + case DERBY: + //https://db.apache.org/derby/docs/10.1/ref/rrefsqlj31783.html + //sadly in Derby, FOR UPDATE doesn't meant what it should + return selectStatement; + case MYSQL: + //http://dev.mysql.com/doc/refman/5.7/en/select.html + case ORACLE: + //https://docs.oracle.com/cd/E17952_01/refman-5.6-en/select.html + case POSTGRES: + //http://www.postgresql.org/docs/9.0/static/sql-select.html + return selectStatement + " for update"; + case SQLSERVER: + //https://msdn.microsoft.com/en-us/library/ms189499.aspx + //https://msdn.microsoft.com/en-us/library/ms187373.aspx + String modifier = " with (updlock)"; + int wherePos = selectStatement.toUpperCase().indexOf(" WHERE "); + if (wherePos < 0) { + return selectStatement + modifier; + } + return selectStatement.substring(0, wherePos) + modifier + + selectStatement.substring(wherePos, selectStatement.length()); + default: + String msg = "Unrecognized database product name <" + dbProduct + ">"; + LOG.error(msg); + throw new MetaException(msg); + } + } + + /** + * Suppose you have a query "select a,b from T" and you want to limit the result set + * to the first 5 rows. The mechanism to do that differs in different DBs. + * Make {@code noSelectsqlQuery} to be "a,b from T" and this method will return the + * appropriately modified row limiting query. + * <p> + * Note that if {@code noSelectsqlQuery} contains a join, you must make sure that + * all columns are unique for Oracle. + */ + public String addLimitClause(int numRows, String noSelectsqlQuery) throws MetaException { + switch (dbProduct) { + case DERBY: + //http://db.apache.org/derby/docs/10.7/ref/rrefsqljoffsetfetch.html + return "select " + noSelectsqlQuery + " fetch first " + numRows + " rows only"; + case MYSQL: + //http://www.postgresql.org/docs/7.3/static/queries-limit.html + case POSTGRES: + //https://dev.mysql.com/doc/refman/5.0/en/select.html + return "select " + noSelectsqlQuery + " limit " + numRows; + case ORACLE: + //newer versions (12c and later) support OFFSET/FETCH + return "select * from (select " + noSelectsqlQuery + ") where rownum <= " + numRows; + case SQLSERVER: + //newer versions (2012 and later) support OFFSET/FETCH + //https://msdn.microsoft.com/en-us/library/ms189463.aspx + return "select TOP(" + numRows + ") " + noSelectsqlQuery; + default: + String msg = "Unrecognized database product name <" + dbProduct + ">"; + LOG.error(msg); + throw new MetaException(msg); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/92f9d8fb/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 1887c05..f77900d 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider; import org.apache.hadoop.hive.metastore.datasource.HikariCPDataSourceProvider; import org.apache.hadoop.hive.metastore.metrics.Metrics; import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; +import org.apache.hadoop.hive.metastore.tools.SQLGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.dbcp.PoolingDataSource; @@ -3562,139 +3563,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } } } - /** - * Helper class that generates SQL queries with syntax specific to target DB - * todo: why throw MetaException? - */ - @VisibleForTesting - static final class SQLGenerator { - private final DatabaseProduct dbProduct; - private final HiveConf conf; - SQLGenerator(DatabaseProduct dbProduct, HiveConf conf) { - this.dbProduct = dbProduct; - this.conf = conf; - } - /** - * Genereates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB - * @param tblColumns e.g. "T(a,b,c)" - * @param rows e.g. list of Strings like 3,4,'d' - * @return fully formed INSERT INTO ... statements - */ - List<String> createInsertValuesStmt(String tblColumns, List<String> rows) { - if(rows == null || rows.size() == 0) { - return Collections.emptyList(); - } - List<String> insertStmts = new ArrayList<>(); - StringBuilder sb = new StringBuilder(); - switch (dbProduct) { - case ORACLE: - if(rows.size() > 1) { - //http://www.oratable.com/oracle-insert-all/ - //https://livesql.oracle.com/apex/livesql/file/content_BM1LJQ87M5CNIOKPOWPV6ZGR3.html - for (int numRows = 0; numRows < rows.size(); numRows++) { - if (numRows % conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE) == 0) { - if (numRows > 0) { - sb.append(" select * from dual"); - insertStmts.add(sb.toString()); - } - sb.setLength(0); - sb.append("insert all "); - } - sb.append("into ").append(tblColumns).append(" values(").append(rows.get(numRows)).append(") "); - } - sb.append("select * from dual"); - insertStmts.add(sb.toString()); - return insertStmts; - } - //fall through - case DERBY: - case MYSQL: - case POSTGRES: - case SQLSERVER: - for(int numRows = 0; numRows < rows.size(); numRows++) { - if(numRows % conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE) == 0) { - if(numRows > 0) { - insertStmts.add(sb.substring(0, sb.length() - 1));//exclude trailing comma - } - sb.setLength(0); - sb.append("insert into ").append(tblColumns).append(" values"); - } - sb.append('(').append(rows.get(numRows)).append("),"); - } - insertStmts.add(sb.substring(0, sb.length() - 1));//exclude trailing comma - return insertStmts; - default: - String msg = "Unrecognized database product name <" + dbProduct + ">"; - LOG.error(msg); - throw new IllegalStateException(msg); - } - } - /** - * Given a {@code selectStatement}, decorated it with FOR UPDATE or semantically equivalent - * construct. If the DB doesn't support, return original select. - */ - String addForUpdateClause(String selectStatement) throws MetaException { - switch (dbProduct) { - case DERBY: - //https://db.apache.org/derby/docs/10.1/ref/rrefsqlj31783.html - //sadly in Derby, FOR UPDATE doesn't meant what it should - return selectStatement; - case MYSQL: - //http://dev.mysql.com/doc/refman/5.7/en/select.html - case ORACLE: - //https://docs.oracle.com/cd/E17952_01/refman-5.6-en/select.html - case POSTGRES: - //http://www.postgresql.org/docs/9.0/static/sql-select.html - return selectStatement + " for update"; - case SQLSERVER: - //https://msdn.microsoft.com/en-us/library/ms189499.aspx - //https://msdn.microsoft.com/en-us/library/ms187373.aspx - String modifier = " with (updlock)"; - int wherePos = selectStatement.toUpperCase().indexOf(" WHERE "); - if(wherePos < 0) { - return selectStatement + modifier; - } - return selectStatement.substring(0, wherePos) + modifier + - selectStatement.substring(wherePos, selectStatement.length()); - default: - String msg = "Unrecognized database product name <" + dbProduct + ">"; - LOG.error(msg); - throw new MetaException(msg); - } - } - /** - * Suppose you have a query "select a,b from T" and you want to limit the result set - * to the first 5 rows. The mechanism to do that differs in different DBs. - * Make {@code noSelectsqlQuery} to be "a,b from T" and this method will return the - * appropriately modified row limiting query. - * - * Note that if {@code noSelectsqlQuery} contains a join, you must make sure that - * all columns are unique for Oracle. - */ - private String addLimitClause(int numRows, String noSelectsqlQuery) throws MetaException { - switch (dbProduct) { - case DERBY: - //http://db.apache.org/derby/docs/10.7/ref/rrefsqljoffsetfetch.html - return "select " + noSelectsqlQuery + " fetch first " + numRows + " rows only"; - case MYSQL: - //http://www.postgresql.org/docs/7.3/static/queries-limit.html - case POSTGRES: - //https://dev.mysql.com/doc/refman/5.0/en/select.html - return "select " + noSelectsqlQuery + " limit " + numRows; - case ORACLE: - //newer versions (12c and later) support OFFSET/FETCH - return "select * from (select " + noSelectsqlQuery + ") where rownum <= " + numRows; - case SQLSERVER: - //newer versions (2012 and later) support OFFSET/FETCH - //https://msdn.microsoft.com/en-us/library/ms189463.aspx - return "select TOP(" + numRows + ") " + noSelectsqlQuery; - default: - String msg = "Unrecognized database product name <" + dbProduct + ">"; - LOG.error(msg); - throw new MetaException(msg); - } - } - } private static class NoPoolConnectionPool implements DataSource { // Note that this depends on the fact that no-one in this class calls anything but http://git-wip-us.apache.org/repos/asf/hive/blob/92f9d8fb/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java index 5448d0d..7882dad 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java @@ -1,29 +1,25 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ package org.apache.hadoop.hive.metastore; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Set; import com.codahale.metrics.Counter; +import com.google.common.collect.ImmutableList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; @@ -34,10 +30,10 @@ import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.InvalidInputException; import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventRequest; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.Role; @@ -49,19 +45,33 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.messaging.EventMessage; import org.apache.hadoop.hive.metastore.metrics.Metrics; import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; +import org.apache.hadoop.hive.metastore.model.MNotificationLog; +import org.apache.hadoop.hive.metastore.model.MNotificationNextId; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; - import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.ImmutableList; - import javax.jdo.Query; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class TestObjectStore { private ObjectStore objectStore = null; @@ -204,7 +214,7 @@ public class TestObjectStore { StorageDescriptor sd1 = new StorageDescriptor(ImmutableList.of(new FieldSchema("pk_col", "double", null)), "location", null, null, false, 0, new SerDeInfo("SerDeName", "serializationLib", null), null, null, null); - HashMap<String,String> params = new HashMap<String,String>(); + HashMap<String, String> params = new HashMap<>(); params.put("EXTERNAL", "false"); Table tbl1 = new Table(TABLE1, DB1, "owner", 1, 2, 3, sd1, null, params, null, null, "MANAGED_TABLE"); objectStore.createTable(tbl1); @@ -273,13 +283,13 @@ public class TestObjectStore { Database db1 = new Database(DB1, "description", "locationurl", null); objectStore.createDatabase(db1); StorageDescriptor sd = new StorageDescriptor(null, "location", null, null, false, 0, new SerDeInfo("SerDeName", "serializationLib", null), null, null, null); - HashMap<String,String> tableParams = new HashMap<String,String>(); + HashMap<String, String> tableParams = new HashMap<>(); tableParams.put("EXTERNAL", "false"); FieldSchema partitionKey1 = new FieldSchema("Country", ColumnType.STRING_TYPE_NAME, ""); FieldSchema partitionKey2 = new FieldSchema("State", ColumnType.STRING_TYPE_NAME, ""); Table tbl1 = new Table(TABLE1, DB1, "owner", 1, 2, 3, sd, Arrays.asList(partitionKey1, partitionKey2), tableParams, null, null, "MANAGED_TABLE"); objectStore.createTable(tbl1); - HashMap<String, String> partitionParams = new HashMap<String, String>(); + HashMap<String, String> partitionParams = new HashMap<>(); partitionParams.put("PARTITION_LEVEL_PRIVILEGE", "true"); List<String> value1 = Arrays.asList("US", "CA"); Partition part1 = new Partition(value1, DB1, TABLE1, 111, 111, sd, partitionParams); @@ -360,6 +370,10 @@ public class TestObjectStore { HiveConf conf = new HiveConf(); conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED, true); Metrics.initialize(conf); + conf.setVar(HiveConf.ConfVars.HIVE_CODAHALE_METRICS_REPORTER_CLASSES, + "org.apache.hadoop.hive.common.metrics.metrics2.JsonFileMetricsReporter, " + + "org.apache.hadoop.hive.common.metrics.metrics2.JmxMetricsReporter" + ); // recall setup so that we get an object store with the metrics initalized setUp(); @@ -397,16 +411,16 @@ public class TestObjectStore { Assert.assertEquals(1, directSqlErrors.getCount()); } - public static void dropAllStoreObjects(RawStore store) throws MetaException, InvalidObjectException, InvalidInputException { + private static void dropAllStoreObjects(RawStore store) + throws MetaException, InvalidObjectException, InvalidInputException { try { Deadline.registerIfNot(100000); - List<Function> funcs = store.getAllFunctions(); - for (Function func : funcs) { + List<Function> functions = store.getAllFunctions(); + for (Function func : functions) { store.dropFunction(func.getDbName(), func.getFunctionName()); } List<String> dbs = store.getAllDatabases(); - for (int i = 0; i < dbs.size(); i++) { - String db = dbs.get(i); + for (String db : dbs) { List<String> tbls = store.getAllTables(db); for (String tbl : tbls) { List<Index> indexes = store.getIndexes(db, tbl, 100); @@ -459,4 +473,87 @@ public class TestObjectStore { Mockito.verify(spy, Mockito.times(3)) .rollbackAndCleanup(Mockito.anyBoolean(), Mockito.<Query>anyObject()); } + + @Ignore( + "This test is here to allow testing with other databases like mysql / postgres etc\n" + + " with user changes to the code. This cannot be run on apache derby because of\n" + + " https://db.apache.org/derby/docs/10.10/devguide/cdevconcepts842385.html" + ) + @Test + public void testConcurrentAddNotifications() throws ExecutionException, InterruptedException { + + final int NUM_THREADS = 10; + CyclicBarrier cyclicBarrier = new CyclicBarrier(NUM_THREADS, + () -> LoggerFactory.getLogger("test") + .debug(NUM_THREADS + " threads going to add notification")); + + HiveConf conf = new HiveConf(); + conf.setVar(HiveConf.ConfVars.METASTORE_EXPRESSION_PROXY_CLASS, + MockPartitionExpressionProxy.class.getName()); + /* + Below are the properties that need to be set based on what database this test is going to be run + */ + +// conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, "com.mysql.jdbc.Driver"); +// conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, +// "jdbc:mysql://localhost:3306/metastore_db"); +// conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME, ""); +// conf.setVar(HiveConf.ConfVars.METASTOREPWD, ""); + + /* + we have to add this one manually as for tests the db is initialized via the metastoreDiretSQL + and we don't run the schema creation sql that includes the an insert for notification_sequence + which can be locked. the entry in notification_sequence happens via notification_event insertion. + */ + objectStore.getPersistenceManager().newQuery(MNotificationLog.class, "eventType==''").execute(); + objectStore.getPersistenceManager().newQuery(MNotificationNextId.class, "nextEventId==-1").execute(); + + objectStore.addNotificationEvent( + new NotificationEvent(0, 0, + EventMessage.EventType.CREATE_DATABASE.toString(), + "CREATE DATABASE DB initial")); + + ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS); + for (int i = 0; i < NUM_THREADS; i++) { + final int n = i; + + executorService.execute( + () -> { + ObjectStore store = new ObjectStore(); + store.setConf(conf); + + String eventType = EventMessage.EventType.CREATE_DATABASE.toString(); + NotificationEvent dbEvent = + new NotificationEvent(0, 0, eventType, + "CREATE DATABASE DB" + n); + System.out.println("ADDING NOTIFICATION"); + + try { + cyclicBarrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + throw new RuntimeException(e); + } + store.addNotificationEvent(dbEvent); + System.out.println("FINISH NOTIFICATION"); + }); + } + executorService.shutdown(); + assertTrue(executorService.awaitTermination(15, TimeUnit.SECONDS)); + + // we have to setup this again as the underlying PMF keeps getting reinitialized with original + // reference closed + ObjectStore store = new ObjectStore(); + store.setConf(conf); + + NotificationEventResponse eventResponse = store.getNextNotification( + new NotificationEventRequest()); + assertEquals(NUM_THREADS + 1, eventResponse.getEventsSize()); + long previousId = 0; + for (NotificationEvent event : eventResponse.getEvents()) { + assertTrue("previous:" + previousId + " current:" + event.getEventId(), + previousId < event.getEventId()); + assertTrue(previousId + 1 == event.getEventId()); + previousId = event.getEventId(); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/92f9d8fb/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java index 4c3b824..1497c00 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.metastore.txn; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.DatabaseProduct; +import org.apache.hadoop.hive.metastore.tools.SQLGenerator; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -150,8 +151,8 @@ public class TestTxnUtils { @Test public void testSQLGenerator() throws Exception { //teseted on Oracle Database 11g Express Edition Release 11.2.0.2.0 - 64bit Production - TxnHandler.SQLGenerator sqlGenerator = - new TxnHandler.SQLGenerator(DatabaseProduct.ORACLE, conf); + SQLGenerator sqlGenerator = + new SQLGenerator(DatabaseProduct.ORACLE, conf); List<String> rows = new ArrayList<>(); rows.add("'yellow', 1"); List<String> sql = sqlGenerator.createInsertValuesStmt("colors(name, category)", rows); @@ -173,7 +174,7 @@ public class TestTxnUtils { Assert.assertEquals("Wrong stmt", "insert all into colors(name, category) values('G',997) into colors(name, category) values('G',998) into colors(name, category) values('G',999) select * from dual", sql.get(1)); sqlGenerator = - new TxnHandler.SQLGenerator(DatabaseProduct.MYSQL, conf); + new SQLGenerator(DatabaseProduct.MYSQL, conf); rows.clear(); rows.add("'yellow', 1"); sql = sqlGenerator.createInsertValuesStmt("colors(name, category)", rows); @@ -192,7 +193,7 @@ public class TestTxnUtils { Assert.assertEquals("Wrong stmt", "insert into colors(name, category) values('yellow', 1),('red', 2),('orange', 3),('G',0),('G',1),('G',2),('G',3),('G',4),('G',5),('G',6),('G',7),('G',8),('G',9),('G',10),('G',11),('G',12),('G',13),('G',14),('G',15),('G',16),('G',17),('G',18),('G',19),('G',20),('G',21),('G',22),('G',23),('G',24),('G',25),('G',26),('G',27),('G',28),('G',29),('G',30),('G',31),('G',32),('G',33),('G',34),('G',35),('G',36),('G',37),('G',38),('G',39),('G',40),('G',41),('G',42),('G',43),('G',44),('G',45),('G',46),('G',47),('G',48),('G',49),('G',50),('G',51),('G',52),('G',53),('G',54),('G',55),('G',56),('G',57),('G',58),('G',59),('G',60),('G',61),('G',62),('G',63),('G',64),('G',65),('G',66),('G',67),('G',68),('G',69),('G',70),('G',71),('G',72),('G',73),('G',74),('G',75),('G',76),('G',77),('G',78),('G',79),('G',80),('G',81),('G',82),('G',83),('G',84),('G',85),('G',86),('G',87),('G',88),('G',89),('G',90),('G',91),('G',92),('G',93),('G',94),('G',95),('G',96),('G',97),('G',9 8),('G',99),('G',100),('G',101),('G',102),('G',103),('G',104),('G',105),('G',106),('G',107),('G',108),('G',109),('G',110),('G',111),('G',112),('G',113),('G',114),('G',115),('G',116),('G',117),('G',118),('G',119),('G',120),('G',121),('G',122),('G',123),('G',124),('G',125),('G',126),('G',127),('G',128),('G',129),('G',130),('G',131),('G',132),('G',133),('G',134),('G',135),('G',136),('G',137),('G',138),('G',139),('G',140),('G',141),('G',142),('G',143),('G',144),('G',145),('G',146),('G',147),('G',148),('G',149),('G',150),('G',151),('G',152),('G',153),('G',154),('G',155),('G',156),('G',157),('G',158),('G',159),('G',160),('G',161),('G',162),('G',163),('G',164),('G',165),('G',166),('G',167),('G',168),('G',169),('G',170),('G',171),('G',172),('G',173),('G',174),('G',175),('G',176),('G',177),('G',178),('G',179),('G',180),('G',181),('G',182),('G',183),('G',184),('G',185),('G',186),('G',187),('G',188),('G',189),('G',190),('G',191),('G',192),('G',193),('G',194),('G',195),('G',196),('G',197),('G', 198),('G',199),('G',200),('G',201),('G',202),('G',203),('G',204),('G',205),('G',206),('G',207),('G',208),('G',209),('G',210),('G',211),('G',212),('G',213),('G',214),('G',215),('G',216),('G',217),('G',218),('G',219),('G',220),('G',221),('G',222),('G',223),('G',224),('G',225),('G',226),('G',227),('G',228),('G',229),('G',230),('G',231),('G',232),('G',233),('G',234),('G',235),('G',236),('G',237),('G',238),('G',239),('G',240),('G',241),('G',242),('G',243),('G',244),('G',245),('G',246),('G',247),('G',248),('G',249),('G',250),('G',251),('G',252),('G',253),('G',254),('G',255),('G',256),('G',257),('G',258),('G',259),('G',260),('G',261),('G',262),('G',263),('G',264),('G',265),('G',266),('G',267),('G',268),('G',269),('G',270),('G',271),('G',272),('G',273),('G',274),('G',275),('G',276),('G',277),('G',278),('G',279),('G',280),('G',281),('G',282),('G',283),('G',284),('G',285),('G',286),('G',287),('G',288),('G',289),('G',290),('G',291),('G',292),('G',293),('G',294),('G',295),('G',296),('G',297),(' G',298),('G',299),('G',300),('G',301),('G',302),('G',303),('G',304),('G',305),('G',306),('G',307),('G',308),('G',309),('G',310),('G',311),('G',312),('G',313),('G',314),('G',315),('G',316),('G',317),('G',318),('G',319),('G',320),('G',321),('G',322),('G',323),('G',324),('G',325),('G',326),('G',327),('G',328),('G',329),('G',330),('G',331),('G',332),('G',333),('G',334),('G',335),('G',336),('G',337),('G',338),('G',339),('G',340),('G',341),('G',342),('G',343),('G',344),('G',345),('G',346),('G',347),('G',348),('G',349),('G',350),('G',351),('G',352),('G',353),('G',354),('G',355),('G',356),('G',357),('G',358),('G',359),('G',360),('G',361),('G',362),('G',363),('G',364),('G',365),('G',366),('G',367),('G',368),('G',369),('G',370),('G',371),('G',372),('G',373),('G',374),('G',375),('G',376),('G',377),('G',378),('G',379),('G',380),('G',381),('G',382),('G',383),('G',384),('G',385),('G',386),('G',387),('G',388),('G',389),('G',390),('G',391),('G',392),('G',393),('G',394),('G',395),('G',396),('G',397) ,('G',398),('G',399),('G',400),('G',401),('G',402),('G',403),('G',404),('G',405),('G',406),('G',407),('G',408),('G',409),('G',410),('G',411),('G',412),('G',413),('G',414),('G',415),('G',416),('G',417),('G',418),('G',419),('G',420),('G',421),('G',422),('G',423),('G',424),('G',425),('G',426),('G',427),('G',428),('G',429),('G',430),('G',431),('G',432),('G',433),('G',434),('G',435),('G',436),('G',437),('G',438),('G',439),('G',440),('G',441),('G',442),('G',443),('G',444),('G',445),('G',446),('G',447),('G',448),('G',449),('G',450),('G',451),('G',452),('G',453),('G',454),('G',455),('G',456),('G',457),('G',458),('G',459),('G',460),('G',461),('G',462),('G',463),('G',464),('G',465),('G',466),('G',467),('G',468),('G',469),('G',470),('G',471),('G',472),('G',473),('G',474),('G',475),('G',476),('G',477),('G',478),('G',479),('G',480),('G',481),('G',482),('G',483),('G',484),('G',485),('G',486),('G',487),('G',488),('G',489),('G',490),('G',491),('G',492),('G',493),('G',494),('G',495),('G',496),('G',4 97),('G',498),('G',499),('G',500),('G',501),('G',502),('G',503),('G',504),('G',505),('G',506),('G',507),('G',508),('G',509),('G',510),('G',511),('G',512),('G',513),('G',514),('G',515),('G',516),('G',517),('G',518),('G',519),('G',520),('G',521),('G',522),('G',523),('G',524),('G',525),('G',526),('G',527),('G',528),('G',529),('G',530),('G',531),('G',532),('G',533),('G',534),('G',535),('G',536),('G',537),('G',538),('G',539),('G',540),('G',541),('G',542),('G',543),('G',544),('G',545),('G',546),('G',547),('G',548),('G',549),('G',550),('G',551),('G',552),('G',553),('G',554),('G',555),('G',556),('G',557),('G',558),('G',559),('G',560),('G',561),('G',562),('G',563),('G',564),('G',565),('G',566),('G',567),('G',568),('G',569),('G',570),('G',571),('G',572),('G',573),('G',574),('G',575),('G',576),('G',577),('G',578),('G',579),('G',580),('G',581),('G',582),('G',583),('G',584),('G',585),('G',586),('G',587),('G',588),('G',589),('G',590),('G',591),('G',592),('G',593),('G',594),('G',595),('G',596),('G ',597),('G',598),('G',599),('G',600),('G',601),('G',602),('G',603),('G',604),('G',605),('G',606),('G',607),('G',608),('G',609),('G',610),('G',611),('G',612),('G',613),('G',614),('G',615),('G',616),('G',617),('G',618),('G',619),('G',620),('G',621),('G',622),('G',623),('G',624),('G',625),('G',626),('G',627),('G',628),('G',629),('G',630),('G',631),('G',632),('G',633),('G',634),('G',635),('G',636),('G',637),('G',638),('G',639),('G',640),('G',641),('G',642),('G',643),('G',644),('G',645),('G',646),('G',647),('G',648),('G',649),('G',650),('G',651),('G',652),('G',653),('G',654),('G',655),('G',656),('G',657),('G',658),('G',659),('G',660),('G',661),('G',662),('G',663),('G',664),('G',665),('G',666),('G',667),('G',668),('G',669),('G',670),('G',671),('G',672),('G',673),('G',674),('G',675),('G',676),('G',677),('G',678),('G',679),('G',680),('G',681),('G',682),('G',683),('G',684),('G',685),('G',686),('G',687),('G',688),('G',689),('G',690),('G',691),('G',692),('G',693),('G',694),('G',695),('G',696), ('G',697),('G',698),('G',699),('G',700),('G',701),('G',702),('G',703),('G',704),('G',705),('G',706),('G',707),('G',708),('G',709),('G',710),('G',711),('G',712),('G',713),('G',714),('G',715),('G',716),('G',717),('G',718),('G',719),('G',720),('G',721),('G',722),('G',723),('G',724),('G',725),('G',726),('G',727),('G',728),('G',729),('G',730),('G',731),('G',732),('G',733),('G',734),('G',735),('G',736),('G',737),('G',738),('G',739),('G',740),('G',741),('G',742),('G',743),('G',744),('G',745),('G',746),('G',747),('G',748),('G',749),('G',750),('G',751),('G',752),('G',753),('G',754),('G',755),('G',756),('G',757),('G',758),('G',759),('G',760),('G',761),('G',762),('G',763),('G',764),('G',765),('G',766),('G',767),('G',768),('G',769),('G',770),('G',771),('G',772),('G',773),('G',774),('G',775),('G',776),('G',777),('G',778),('G',779),('G',780),('G',781),('G',782),('G',783),('G',784),('G',785),('G',786),('G',787),('G',788),('G',789),('G',790),('G',791),('G',792),('G',793),('G',794),('G',795),('G',79 6),('G',797),('G',798),('G',799),('G',800),('G',801),('G',802),('G',803),('G',804),('G',805),('G',806),('G',807),('G',808),('G',809),('G',810),('G',811),('G',812),('G',813),('G',814),('G',815),('G',816),('G',817),('G',818),('G',819),('G',820),('G',821),('G',822),('G',823),('G',824),('G',825),('G',826),('G',827),('G',828),('G',829),('G',830),('G',831),('G',832),('G',833),('G',834),('G',835),('G',836),('G',837),('G',838),('G',839),('G',840),('G',841),('G',842),('G',843),('G',844),('G',845),('G',846),('G',847),('G',848),('G',849),('G',850),('G',851),('G',852),('G',853),('G',854),('G',855),('G',856),('G',857),('G',858),('G',859),('G',860),('G',861),('G',862),('G',863),('G',864),('G',865),('G',866),('G',867),('G',868),('G',869),('G',870),('G',871),('G',872),('G',873),('G',874),('G',875),('G',876),('G',877),('G',878),('G',879),('G',880),('G',881),('G',882),('G',883),('G',884),('G',885),('G',886),('G',887),('G',888),('G',889),('G',890),('G',891),('G',892),('G',893),('G',894),('G',895),('G' ,896),('G',897),('G',898),('G',899),('G',900),('G',901),('G',902),('G',903),('G',904),('G',905),('G',906),('G',907),('G',908),('G',909),('G',910),('G',911),('G',912),('G',913),('G',914),('G',915),('G',916),('G',917),('G',918),('G',919),('G',920),('G',921),('G',922),('G',923),('G',924),('G',925),('G',926),('G',927),('G',928),('G',929),('G',930),('G',931),('G',932),('G',933),('G',934),('G',935),('G',936),('G',937),('G',938),('G',939),('G',940),('G',941),('G',942),('G',943),('G',944),('G',945),('G',946),('G',947),('G',948),('G',949),('G',950),('G',951),('G',952),('G',953),('G',954),('G',955),('G',956),('G',957),('G',958),('G',959),('G',960),('G',961),('G',962),('G',963),('G',964),('G',965),('G',966),('G',967),('G',968),('G',969),('G',970),('G',971),('G',972),('G',973),('G',974),('G',975),('G',976),('G',977),('G',978),('G',979),('G',980),('G',981),('G',982),('G',983),('G',984),('G',985),('G',986),('G',987),('G',988),('G',989),('G',990),('G',991),('G',992),('G',993),('G',994),('G',995),( 'G',996)", sql.get(0)); Assert.assertEquals("Wrong stmt", "insert into colors(name, category) values('G',997),('G',998),('G',999)", sql.get(1)); - sqlGenerator = new TxnHandler.SQLGenerator(DatabaseProduct.SQLSERVER, conf); + sqlGenerator = new SQLGenerator(DatabaseProduct.SQLSERVER, conf); String modSql = sqlGenerator.addForUpdateClause("select nl_next from NEXT_LOCK_ID"); Assert.assertEquals("select nl_next from NEXT_LOCK_ID with (updlock)", modSql); modSql = sqlGenerator.addForUpdateClause("select MT_COMMENT from AUX_TABLE where MT_KEY1='CheckLock' and MT_KEY2=0");