This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new bc35507757 HIVE-26242: Compaction heartbeater improvements (Laszlo Vegh, reviewed by Karen Coppage, Denys Kuzmenko) bc35507757 is described below commit bc35507757c23a6da612a2dc4b840105aed2515c Author: veghlaci05 <90267982+veghlac...@users.noreply.github.com> AuthorDate: Wed Jun 29 13:16:59 2022 +0200 HIVE-26242: Compaction heartbeater improvements (Laszlo Vegh, reviewed by Karen Coppage, Denys Kuzmenko) Closes #3303 --- .../txn/compactor/CompactionHeartbeatService.java | 217 +++++++++++++++++++++ .../ql/txn/compactor/IMetaStoreClientFactory.java | 64 ++++++ .../hadoop/hive/ql/txn/compactor/Worker.java | 84 ++------ .../compactor/TestCompactionHeartbeatService.java | 154 +++++++++++++++ 4 files changed, 446 insertions(+), 73 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionHeartbeatService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionHeartbeatService.java new file mode 100644 index 0000000000..788955e35c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionHeartbeatService.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.pool2.ObjectPool; +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hive.common.util.ShutdownHookManager; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.apache.hive.common.util.HiveStringUtils.SHUTDOWN_HOOK_PRIORITY; + +/** + * Singleton service responsible for heartbeating the compaction transactions. + */ +class CompactionHeartbeatService { + + + private static final Logger LOG = LoggerFactory.getLogger(CompactionHeartbeatService.class); + + private static volatile CompactionHeartbeatService instance; + + /** + * Return the singleton instance of this class. + * @param conf The {@link HiveConf} used to create the service. Used only during the firsst call + * @return Returns the singleton {@link CompactionHeartbeatService} + * @throws IllegalStateException Thrown when the service has already been destroyed. + */ + static CompactionHeartbeatService getInstance(HiveConf conf) { + if (instance == null) { + synchronized (CompactionHeartbeatService.class) { + if (instance == null) { + LOG.debug("Initializing compaction txn heartbeater service."); + instance = new CompactionHeartbeatService(conf); + ShutdownHookManager.addShutdownHook(() -> instance.shutdown(), SHUTDOWN_HOOK_PRIORITY); + } + } + } + if (instance.shuttingDown) { + throw new IllegalStateException("CompactionHeartbeatService is already destroyed!"); + } + return instance; + } + + private final ObjectPool<IMetaStoreClient> clientPool; + private volatile boolean shuttingDown = false; + private final long initialDelay; + private final long period; + private final ConcurrentHashMap<Long, CompactionHeartbeater> tasks = new ConcurrentHashMap<>(30); + + /** + * Starts the heartbeat for the given transaction + * @param txnId The id of the compaction txn + * @param lockId The id of the lock associated with the txn + * @param tableName Required for logging only + * @throws IllegalStateException Thrown when the heartbeat for the given txn has already been started. + */ + void startHeartbeat(long txnId, long lockId, String tableName) { + if (shuttingDown) { + throw new IllegalStateException("Service is shutting down, starting new heartbeats is not possible!"); + } + if (tasks.containsKey(txnId)) { + throw new IllegalStateException("Heartbeat was already started for TXN " + txnId); + } + LOG.info("Submitting heartbeat task for TXN {}", txnId); + CompactionHeartbeater heartbeater = new CompactionHeartbeater(txnId, lockId, tableName); + heartbeater.start(); + tasks.put(txnId, heartbeater); + } + + /** + * Stops the heartbeat for the given transaction + * @param txnId The id of the compaction txn + * @throws IllegalStateException Thrown when there is no {@link CompactionHeartbeater} task associated with the + * given txnId. + */ + void stopHeartbeat(long txnId) throws InterruptedException { + LOG.info("Stopping heartbeat task for TXN {}", txnId); + CompactionHeartbeater heartbeater = tasks.get(txnId); + if (heartbeater == null) { + throw new IllegalStateException("No registered heartbeat found for TXN " + txnId); + } + try { + heartbeater.stop(); + } finally { + tasks.remove(txnId); + } + } + + /** + * Shuts down the service, by closing its underlying resources. Be aware that after shutdown this service is no + * longer usable, there is no way to re-initialize it. + */ + void shutdown() { + shuttingDown = true; + LOG.info("Shutting down compaction txn heartbeater service."); + for (CompactionHeartbeater heartbeater : tasks.values()) { + try { + heartbeater.stop(); + } catch (InterruptedException e) { + LOG.warn("Shutdownhook thread was interrupted during shutting down the CompactionHeartbeatService."); + } + } + tasks.clear(); + clientPool.close(); + LOG.info("Compaction txn heartbeater service is successfully stopped."); + } + + private CompactionHeartbeatService(HiveConf conf) { + int numberOfWorkers = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS); + GenericObjectPoolConfig<IMetaStoreClient> config = new GenericObjectPoolConfig<>(); + config.setMinIdle(1); + config.setMaxIdle(2); + config.setMaxTotal(numberOfWorkers); + config.setMaxWaitMillis(2000); + clientPool = new GenericObjectPool<>(new IMetaStoreClientFactory(conf), config); + long txnTimeout = MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS); + initialDelay = txnTimeout / 4; + period = txnTimeout / 2; + } + + private final class CompactionHeartbeater { + private final Logger LOG = LoggerFactory.getLogger(CompactionHeartbeater.class); + private final long txnId; + private final long lockId; + private final String tableName; + private final ScheduledThreadPoolExecutor heartbeatExecutor; + + private CompactionHeartbeater(long txnId, long lockId, String tableName) { + heartbeatExecutor = new ScheduledThreadPoolExecutor(1); + heartbeatExecutor.setThreadFactory(new ThreadFactoryBuilder() + .setPriority(Thread.MIN_PRIORITY) + .setDaemon(true) + .setNameFormat("CompactionTxnHeartbeater-" + txnId) + .build()); + this.tableName = Objects.requireNonNull(tableName); + this.txnId = txnId; + this.lockId = lockId; + } + + void start() { + heartbeatExecutor.scheduleAtFixedRate(() -> { + IMetaStoreClient msc = null; + try { + LOG.debug("Heartbeating compaction transaction id {} for table: {}", txnId, tableName); + // Create a metastore client for each thread since it is not thread safe + msc = clientPool.borrowObject(); + msc.heartbeat(txnId, lockId); + } catch (NoSuchElementException e) { + LOG.error("Compaction transaction heartbeater pool exhausted, unable to heartbeat", e); + // This heartbeat attempt failed, and there is no client to return to the pool. + return; + } catch (TException e) { + LOG.error("Error while heartbeating compaction transaction id {} for table: {}", txnId, tableName, e); + // Heartbeat failed, but the client is not broken, we can return it to the pool. + } catch (Exception e) { + LOG.error("Error while heartbeating compaction transaction id {} for table: {}", txnId, tableName, e); + // Unknown error, invalidate the client, maybe it is broken. + if (msc != null) { + try { + clientPool.invalidateObject(msc); + } catch (Exception ex) { + LOG.error("Error while invalidating a broken MetaStoreClient instance", e); + } + } + return; + } + try { + if (msc != null) { + clientPool.returnObject(msc); + } + } catch (Exception e) { + LOG.error("Error while returning back to the pool a MetaStoreClient instance", e); + } + }, initialDelay, period, TimeUnit.MILLISECONDS); + } + + public void stop() throws InterruptedException { + LOG.info("Shutting down compaction txn heartbeater instance."); + heartbeatExecutor.shutdownNow(); + if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + LOG.warn("Heartbeating for transaction {} did not stop in 5 seconds, do not wait any longer.", txnId); + return; + } + LOG.info("Compaction txn heartbeater instance is successfully stopped."); + } + + } + +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/IMetaStoreClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/IMetaStoreClientFactory.java new file mode 100644 index 0000000000..1fd74ca029 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/IMetaStoreClientFactory.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.commons.pool2.BasePooledObjectFactory; +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; + +import java.util.Objects; + +/** + * Factory class responsible for managing (creating/wrapping/destroying) the {@link IMetaStoreClient} instances. + * Used by the {@link org.apache.commons.pool2.ObjectPool} in {@link CompactionHeartbeatService} to allow pooling + * of {@link IMetaStoreClient}s. + */ +final class IMetaStoreClientFactory extends BasePooledObjectFactory<IMetaStoreClient> { + + private final HiveConf conf; + + @Override + public IMetaStoreClient create() throws Exception { + return HiveMetaStoreUtils.getHiveMetastoreClient(conf); + } + + @Override + public PooledObject<IMetaStoreClient> wrap(IMetaStoreClient msc) { + return new DefaultPooledObject<>(msc); + } + + @Override + public void destroyObject(PooledObject<IMetaStoreClient> msc) { + msc.getObject().close(); + } + + @Override + public boolean validateObject(PooledObject<IMetaStoreClient> msc) { + //Not in use currently, would be good to validate the client at borrowing/returning, but this needs support from + //MetaStoreClient side + return super.validateObject(msc); + } + + public IMetaStoreClientFactory(HiveConf conf) { + this.conf = Objects.requireNonNull(conf); + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 69bff17d0a..707ebff393 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -18,9 +18,7 @@ package org.apache.hadoop.hive.ql.txn.compactor; import com.google.common.annotations.VisibleForTesting; -import java.util.concurrent.ScheduledExecutorService; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -28,7 +26,6 @@ import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreThread; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.DataOperationType; @@ -69,7 +66,6 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.Collections; import java.util.Map; -import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -205,36 +201,6 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread { } } - static final class CompactionHeartbeater implements Runnable { - static final private Logger LOG = LoggerFactory.getLogger(CompactionHeartbeater.class); - private final CompactionTxn compactionTxn; - private final String tableName; - private final HiveConf conf; - - public CompactionHeartbeater(CompactionTxn compactionTxn, String tableName, HiveConf conf) { - this.tableName = Objects.requireNonNull(tableName); - this.compactionTxn = Objects.requireNonNull(compactionTxn); - this.conf = Objects.requireNonNull(conf); - } - - @Override - public void run() { - LOG.debug("Heartbeating compaction transaction id {} for table: {}", compactionTxn, tableName); - IMetaStoreClient msc = null; - try { - // Create a metastore client for each thread since it is not thread safe - msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf); - msc.heartbeat(compactionTxn.getTxnId(), compactionTxn.getLockId()); - } catch (Exception e) { - LOG.error("Error while heartbeating transaction id {} for table: {}", compactionTxn, tableName, e); - } finally { - if (msc != null) { - msc.close(); - } - } - } - } - /** * Determine if compaction can run in a specified directory. * @param isMajorCompaction type of compaction. @@ -669,13 +635,12 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread { /** * Keep track of the compaction's transaction and its operations. */ - private class CompactionTxn implements AutoCloseable { + class CompactionTxn implements AutoCloseable { private long txnId = 0; private long lockId = 0; private TxnStatus status = TxnStatus.UNKNOWN; private boolean succeessfulCompaction = false; - private ScheduledExecutorService heartbeatExecutor; /** * Try to open a new txn. @@ -692,18 +657,7 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread { + "}, status {" + res.getState() + "}, reason {" + res.getErrorMessage() + "}"); } lockId = res.getLockid(); - heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() - .setPriority(Thread.MIN_PRIORITY) - .setDaemon(true) - .setNameFormat("CompactionTxnHeartbeater-" + txnId) - .build()); - long txnTimeout = MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS); - heartbeatExecutor.scheduleAtFixedRate( - new CompactionHeartbeater(this, TxnUtils.getFullTableName(ci.dbname, ci.tableName), conf), - txnTimeout / 4, - txnTimeout / 2, - TimeUnit.MILLISECONDS - ); + CompactionHeartbeatService.getInstance(conf).startHeartbeat(txnId, lockId, TxnUtils.getFullTableName(ci.dbname, ci.tableName)); } /** @@ -718,28 +672,16 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread { * @throws Exception */ @Override public void close() throws Exception { - //the transaction is about to close, we can stop heartbeating regardless of it's state - shutdownHeartbeater(); - if (status != TxnStatus.UNKNOWN) { - if (succeessfulCompaction) { - commit(); - } else { - abort(); - } - } - } - - private void shutdownHeartbeater() { - if (heartbeatExecutor != null) { - heartbeatExecutor.shutdownNow(); - try { - if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) { - LOG.warn("Heartbeating for transaction {} did not stop in 5 seconds, do not wait any longer.", this); + try { + //the transaction is about to close, we can stop heartbeating regardless of it's state + CompactionHeartbeatService.getInstance(conf).stopHeartbeat(txnId); + } finally { + if (status != TxnStatus.UNKNOWN) { + if (succeessfulCompaction) { + commit(); + } else { + abort(); } - } catch (InterruptedException ex) { - //Caller thread was interrupted while waiting for heartbeater to terminate. - //Nothing to do, just restore the interrupted state. - Thread.currentThread().interrupt(); } } } @@ -748,10 +690,6 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread { return txnId; } - long getLockId() { - return lockId; - } - @Override public String toString() { return "txnId=" + txnId + ", lockId=" + lockId + " (TxnStatus: " + status + ")"; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionHeartbeatService.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionHeartbeatService.java new file mode 100644 index 0000000000..99455783da --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionHeartbeatService.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.lang.reflect.Field; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({HiveMetaStoreUtils.class}) +@PowerMockIgnore("javax.management.*") +public class TestCompactionHeartbeatService { + + private static Field HEARTBEAT_SINGLETON; + @Mock + private HiveConf conf; + @Mock + private IMetaStoreClient client; + + @BeforeClass + public static void setupClass() throws NoSuchFieldException { + HEARTBEAT_SINGLETON = CompactionHeartbeatService.class.getDeclaredField("instance"); + HEARTBEAT_SINGLETON.setAccessible(true); + } + + @Before + public void setup() throws Exception { + Mockito.when(conf.get(MetastoreConf.ConfVars.TXN_TIMEOUT.getVarname())).thenReturn("100ms"); + Mockito.when(conf.get(MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS.getVarname())).thenReturn("4"); + PowerMockito.mockStatic(HiveMetaStoreUtils.class); + PowerMockito.when(HiveMetaStoreUtils.getHiveMetastoreClient(any())).thenReturn(client); + HEARTBEAT_SINGLETON.set(null,null); + } + + @After + public void tearDown() throws InterruptedException { + CompactionHeartbeatService.getInstance(conf).shutdown(); + } + + @Test + public void testHeartbeat() throws Exception { + CompactionHeartbeatService.getInstance(conf).startHeartbeat(0, 0,"table"); + Thread.sleep(300); + CompactionHeartbeatService.getInstance(conf).stopHeartbeat(0); + verify(client, atLeast(1)).heartbeat(0,0); + } + + @Test(expected = IllegalStateException.class) + public void testStopHeartbeatForNonExistentTxn() throws InterruptedException { + CompactionHeartbeatService.getInstance(conf).stopHeartbeat(0); + } + + @Test + public void testNoHeartbeatAfterStop() throws Exception { + AtomicBoolean stopped = new AtomicBoolean(false); + doAnswer((Answer<Void>) invocationOnMock -> { + if (stopped.get()) { + Assert.fail("Heartbeat after stopHeartbeat call"); + } + return null; + }).when(client).heartbeat(0,0); + CompactionHeartbeatService.getInstance(conf).startHeartbeat(0, 0,"table"); + Thread.sleep(200); + CompactionHeartbeatService.getInstance(conf).stopHeartbeat(0); + stopped.set(true); + verify(client, atLeast(1)).heartbeat(0,0); + } + + @Test(expected = IllegalStateException.class) + public void testStartHeartbeatTwice() { + CompactionHeartbeatService.getInstance(conf).startHeartbeat(0, 0,"table"); + CompactionHeartbeatService.getInstance(conf).startHeartbeat(0, 0,"table"); + } + + @Test + public void testStopHeartbeatAbortedTheThread() throws Exception { + CountDownLatch countDownLatch = new CountDownLatch(1); + AtomicBoolean heartbeated = new AtomicBoolean(false); + doAnswer((Answer<Void>) invocationOnMock -> { + //make sure we call stopHeartbeat when we are in the middle of the hearbeat call + countDownLatch.countDown(); + Thread.sleep(500); + heartbeated.set(true); + return null; + }).when(client).heartbeat(0,0); + CompactionHeartbeatService.getInstance(conf).startHeartbeat(0, 0,"table"); + //We try to stop heartbeating while it's in the middle of a heartbeat + countDownLatch.await(); + CompactionHeartbeatService.getInstance(conf).stopHeartbeat(0); + Assert.assertFalse(heartbeated.get()); + // Check if heartbeat was done only once despite the timing is 100ms and the first took 500ms + verify(client, times(1)).heartbeat(0,0); + } + + @Test + public void testBadClientInvalidated() throws Exception { + CountDownLatch countDownLatch = new CountDownLatch(3); + doAnswer((Answer<Void>) invocationOnMock -> { + countDownLatch.countDown(); + if (countDownLatch.getCount() == 0) { + Thread.sleep(100); + } + throw new RuntimeException(); + }).when(client).heartbeat(0,0); + CompactionHeartbeatService.getInstance(conf).startHeartbeat(0, 0,"table"); + //We stop only after 3 heartbeats + countDownLatch.await(); + CompactionHeartbeatService.getInstance(conf).stopHeartbeat(0); + // Check if bad clients were closed and new ones were requested + verify(client, times(3)).heartbeat(0,0); + verify(client, times(3)).close(); + PowerMockito.verifyStatic(HiveMetaStoreUtils.class, times(3)); + HiveMetaStoreUtils.getHiveMetastoreClient(conf); + } +}