This is an automated email from the ASF dual-hosted git repository. pravin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new e8ba31a HIVE-25700: Prevent deletion of Notification Events post restarts. (Ayush Saxena, reviewed by Pravin Kumar Sinha) e8ba31a is described below commit e8ba31a7dcc35cc5744423614ed7474876dc563a Author: Ayush Saxena <ayushsax...@apache.org> AuthorDate: Fri Nov 19 23:45:03 2021 +0530 HIVE-25700: Prevent deletion of Notification Events post restarts. (Ayush Saxena, reviewed by Pravin Kumar Sinha) --- .../hcatalog/listener/DbNotificationListener.java | 34 ++++++++++ .../ql/parse/TestReplWithJsonMessageFormat.java | 1 + .../hive/ql/parse/TestReplicationScenarios.java | 72 ++++++++++++++++++++-- .../hadoop/hive/metastore/conf/MetastoreConf.java | 3 + 4 files changed, 106 insertions(+), 4 deletions(-) diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 4f442ce..7980d53 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -142,6 +142,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; +import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL; /** * An implementation of {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} that @@ -254,6 +255,13 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener cleaner.setCleanupInterval(MetastoreConf.getTimeVar(getConf(), MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, TimeUnit.MILLISECONDS)); } + + if (key.equals(EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL.toString()) || key + .equals(EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL.getHiveName())) { + cleaner.setWaitInterval(MetastoreConf + .getTimeVar(getConf(), EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL, + TimeUnit.MILLISECONDS)); + } } /** @@ -1406,6 +1414,8 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener private final RawStore rs; private int ttl; private long sleepTime; + private long waitInterval; + private boolean isInTest; CleanerThread(Configuration conf, RawStore rs) { super("DB-Notification-Cleaner"); @@ -1413,14 +1423,34 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener this.rs = Objects.requireNonNull(rs); boolean isReplEnabled = MetastoreConf.getBoolVar(conf, ConfVars.REPLCMENABLED); + isInTest = conf.getBoolean(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, false); ConfVars ttlConf = (isReplEnabled) ? ConfVars.REPL_EVENT_DB_LISTENER_TTL : ConfVars.EVENT_DB_LISTENER_TTL; setTimeToLive(MetastoreConf.getTimeVar(conf, ttlConf, TimeUnit.SECONDS)); setCleanupInterval( MetastoreConf.getTimeVar(conf, ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, TimeUnit.MILLISECONDS)); + setWaitInterval(MetastoreConf + .getTimeVar(conf, EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL, TimeUnit.MILLISECONDS)); } @Override public void run() { + LOG.info("Wait interval is {}", waitInterval); + if (waitInterval > 0) { + try { + LOG.info("Cleaner Thread Restarted and {} or {} is configured. So cleaner thread will startup post waiting " + + "{} ms", EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL, + EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL.getHiveName(), waitInterval); + Thread.sleep(waitInterval); + } catch (InterruptedException e) { + LOG.error("Failed during the initial wait before start.", e); + if(isInTest) { + Thread.currentThread().interrupt(); + } + return; + } + LOG.info("Completed Cleaner thread initial wait. Starting normal processing."); + } + while (true) { LOG.debug("Cleaner thread running"); try { @@ -1448,5 +1478,9 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener public void setCleanupInterval(long configInterval) { sleepTime = configInterval; } + + public void setWaitInterval(long waitInterval) { + this.waitInterval = waitInterval; + } } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java index 19a56de..dc22be2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java @@ -36,6 +36,7 @@ public class TestReplWithJsonMessageFormat extends TestReplicationScenarios { new ReplicationV1CompatRule(metaStoreClient, hconf, new ArrayList<String>() {{ add("testEventFilters"); add("testReplConfiguredCleanupOfNotificationEvents"); + add("testCleanerThreadStartupWait"); }}); @BeforeClass diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 62f1dd9..61f61a8 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -137,6 +137,8 @@ import org.apache.logging.log4j.LogManager; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; +import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL; +import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.REPL_EVENT_DB_LISTENER_TTL; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.NON_RECOVERABLE_MARKER; @@ -252,6 +254,8 @@ public class TestReplicationScenarios { metaStoreClientMirror = new HiveMetaStoreClient(hconfMirror); PersistenceManagerProvider.setTwoMetastoreTesting(true); + MetastoreConf.setTimeVar(hconf, EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL, 0, TimeUnit.SECONDS); + MetastoreConf.setTimeVar(hconfMirror, EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL, 0, TimeUnit.SECONDS); } @AfterClass @@ -2437,7 +2441,7 @@ public class TestReplicationScenarios { // For next run, CM is enabled, set REPL_EVENT_DB_LISTENER_TTL to low value for events to get deleted MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, cleanerTtlSeconds * 60 * 60, TimeUnit.SECONDS); - MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.REPL_EVENT_DB_LISTENER_TTL, cleanerTtlSeconds , TimeUnit.SECONDS); + MetastoreConf.setTimeVar(hconf, REPL_EVENT_DB_LISTENER_TTL, cleanerTtlSeconds , TimeUnit.SECONDS); DbNotificationListener.resetCleaner(hconf); run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)", driver); @@ -2465,7 +2469,7 @@ public class TestReplicationScenarios { // First check with high ttl MetastoreConf.setBoolVar(hconf, MetastoreConf.ConfVars.REPLCMENABLED, false); MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, cleanerTtlSeconds * 60 * 60, TimeUnit.SECONDS); - MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.REPL_EVENT_DB_LISTENER_TTL, cleanerTtlSeconds, TimeUnit.SECONDS); + MetastoreConf.setTimeVar(hconf, REPL_EVENT_DB_LISTENER_TTL, cleanerTtlSeconds, TimeUnit.SECONDS); DbNotificationListener.resetCleaner(hconf); run("CREATE TABLE " + dbName @@ -2491,7 +2495,7 @@ public class TestReplicationScenarios { //With CM disabled, set a low ttl for events to get deleted MetastoreConf.setBoolVar(hconf, MetastoreConf.ConfVars.REPLCMENABLED, false); MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, cleanerTtlSeconds, TimeUnit.SECONDS); - MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.REPL_EVENT_DB_LISTENER_TTL, cleanerTtlSeconds * 60 * 60, TimeUnit.SECONDS); + MetastoreConf.setTimeVar(hconf, REPL_EVENT_DB_LISTENER_TTL, cleanerTtlSeconds * 60 * 60, TimeUnit.SECONDS); DbNotificationListener.resetCleaner(hconf); run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=2) SELECT a FROM " + dbName @@ -2514,13 +2518,73 @@ public class TestReplicationScenarios { //restore original values MetastoreConf.setBoolVar(hconf, MetastoreConf.ConfVars.REPLCMENABLED, true); MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, 86400, TimeUnit.SECONDS); - MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.REPL_EVENT_DB_LISTENER_TTL, 864000, TimeUnit.SECONDS); + MetastoreConf.setTimeVar(hconf, REPL_EVENT_DB_LISTENER_TTL, 864000, TimeUnit.SECONDS); MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, 7200, TimeUnit.SECONDS); DbNotificationListener.resetCleaner(hconf); verifySetupSteps = verifySetupOriginal; } @Test + public void testCleanerThreadStartupWait() throws Exception { + int eventsTtl = 20; + HiveConf newConf = new HiveConf(hconf); + + // Set TTL short enough for testing. + MetastoreConf.setTimeVar(newConf, REPL_EVENT_DB_LISTENER_TTL, eventsTtl, TimeUnit.SECONDS); + + // Set startup wait interval. + MetastoreConf.setTimeVar(newConf, EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL, eventsTtl * 5, TimeUnit.SECONDS); + + // Set cleaner wait interval. + MetastoreConf + .setTimeVar(newConf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, 10, TimeUnit.MILLISECONDS); + newConf.setBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL, true); + // Reset Cleaner to have a initial wait time. + DbNotificationListener.resetCleaner(newConf); + + IMetaStoreClient msClient = metaStoreClient; + run("create database referenceDb", driver); + + long firstEventId = msClient.getCurrentNotificationEventId().getEventId();; + + run("create database cleanupStartup", driver); + run("drop database cleanupStartup", driver); + + LOG.info("Done with creating events."); + + // Check events are pushed into notification logs. + NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(2, rsp.getEventsSize()); + + // Reset Cleaner to have a initial wait time. + DbNotificationListener.resetCleaner(newConf); + + // Sleep for eventsTtl time and see if events are there. + Thread.sleep(eventsTtl * 1000); + + // Check events are there in notification logs. + rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(2, rsp.getEventsSize()); + + // Sleep for some more time and see if events are there. + Thread.sleep(eventsTtl * 1000); + + rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(2, rsp.getEventsSize()); + + // Sleep more than the initial wait time and see if Events get cleaned up post that + Thread.sleep(eventsTtl * 4000); + + // Events should have cleaned up. + rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(0, rsp.getEventsSize()); + + // Reset with original configuration. + DbNotificationListener.resetCleaner(hconf); + run("drop database referenceDb", driver); + } + + @Test public void testIncrementalInsertToPartition() throws IOException { String testName = "incrementalInsertToPartition"; String dbName = createDB(testName, driver); diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index e42493b..1738ab4 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -752,6 +752,9 @@ public class MetastoreConf { EVENT_DB_LISTENER_CLEAN_INTERVAL("metastore.event.db.listener.clean.interval", "hive.metastore.event.db.listener.clean.interval", 7200, TimeUnit.SECONDS, "sleep interval between each run for cleanup of events from the database listener queue"), + EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL("metastore.event.db.listener.clean.startup.wait.interval", + "hive.metastore.event.db.listener.clean.startup.wait.interval", 1, TimeUnit.DAYS, + "Wait interval post start of metastore after which the cleaner thread starts to work"), EVENT_DB_NOTIFICATION_API_AUTH("metastore.metastore.event.db.notification.api.auth", "hive.metastore.event.db.notification.api.auth", true, "Should metastore do authorization against database notification related APIs such as get_next_notification.\n" +