This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git

commit 849b867edf751757637cdce74bdb8cfd2075c595
Author: Leonard Xu <xbjt...@163.com>
AuthorDate: Tue Nov 17 10:03:52 2020 +0800

    [FLINK-19863][tests][hbase] Harden HBase end-to-end tests
    
    This commit checks the HBase processors and data dir have been cleaned up 
after shutting down a HBase cluster. And also checks the required resources, 
e.g. Zookeeper, HBase meta has been available when starting up a new HBase 
cluster.
    
    This closes #14032
---
 .../util/hbase/LocalStandaloneHBaseResource.java   | 103 ++++++++++++++-------
 1 file changed, 71 insertions(+), 32 deletions(-)

diff --git 
a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.java
 
b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.java
index 2478432..d8c03d7 100644
--- 
a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.java
+++ 
b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.java
@@ -37,6 +37,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 /**
@@ -46,6 +47,8 @@ public class LocalStandaloneHBaseResource implements 
HBaseResource {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(LocalStandaloneHBaseResource.class);
 
+       private static final int MAX_RETRIES = 3;
+       private static final int RETRY_INTERVAL_SECONDS = 30;
        private final TemporaryFolder tmp = new TemporaryFolder();
 
        private final DownloadCache downloadCache = DownloadCache.get();
@@ -68,7 +71,7 @@ public class LocalStandaloneHBaseResource implements 
HBaseResource {
                tmp.create();
                downloadCache.before();
 
-               this.hbaseDir = 
tmp.newFolder("hbase").toPath().toAbsolutePath();
+               this.hbaseDir = tmp.newFolder("hbase-" + 
hbaseVersion).toPath().toAbsolutePath();
                setupHBaseDist();
                setupHBaseCluster();
        }
@@ -92,62 +95,98 @@ public class LocalStandaloneHBaseResource implements 
HBaseResource {
        }
 
        private void setupHBaseCluster() throws IOException {
-               LOG.info("Starting HBase cluster");
-               AutoClosableProcess.runBlocking(
-                       hbaseDir.resolve(Paths.get("bin", 
"start-hbase.sh")).toString());
-
-               while (!isHBaseRunning()) {
-                       try {
-                               LOG.info("Waiting for HBase to start");
-                               Thread.sleep(500L);
-                       } catch (InterruptedException e) {
-                               Thread.currentThread().interrupt();
-                               break;
-                       }
-               }
+               LOG.info("Starting HBase cluster...");
+               runHBaseProcessWithRetry("start-hbase.sh", () -> 
!isHMasterRunning());
+               LOG.info("Start HBase cluster success");
        }
 
        @Override
        public void afterTestSuccess() {
+               shutdownResource();
+               downloadCache.afterTestSuccess();
+               tmp.delete();
+       }
+
+       private void shutdownResource() {
+               LOG.info("Stopping HBase Cluster...");
                try {
-                       LOG.info("Stopping HBase Cluster");
-                       AutoClosableProcess.runBlocking(
-                               hbaseDir.resolve(Paths.get("bin", 
"hbase-daemon.sh")).toString(),
-                               "stop",
-                               "master");
+                       runHBaseProcessWithRetry("stop-hbase.sh", () -> 
isHMasterAlive());
+               } catch (IOException ioe) {
+                       LOG.warn("Error when shutting down HBase Cluster.", 
ioe);
+               }
+               LOG.info("Stop HBase Cluster success");
+       }
+
+       private void runHBaseProcessWithRetry(String command, Supplier<Boolean> 
processStatusChecker) throws IOException {
+               LOG.info("Execute {} for HBase Cluster", command);
 
-                       while (isHBaseRunning()) {
+               for (int i = 1; i <= MAX_RETRIES; i++) {
+                       try {
+                               AutoClosableProcess.runBlocking(
+                                               
hbaseDir.resolve(Paths.get("bin", command)).toString());
+                       } catch (IOException ioe) {
+                               LOG.warn("Get exception when execute {} ", 
command, ioe);
+                       }
+
+                       int waitSecond = 0;
+                       while (processStatusChecker.get()) {
                                try {
-                                       LOG.info("Waiting for HBase to stop");
-                                       Thread.sleep(500L);
+                                       LOG.info("Waiting for HBase {} works", 
command);
+                                       Thread.sleep(1000L);
                                } catch (InterruptedException e) {
-                                       Thread.currentThread().interrupt();
+                                       LOG.warn("sleep interrupted", e);
+                               }
+                               waitSecond++;
+                               if (waitSecond > RETRY_INTERVAL_SECONDS) {
                                        break;
                                }
                        }
 
+                       if (waitSecond < RETRY_INTERVAL_SECONDS) {
+                               break;
+                       } else {
+                               if (i == MAX_RETRIES) {
+                                       LOG.error("Execute {} failed, retry 
times {}", command, i);
+                                       throw new 
IllegalArgumentException(String.format(
+                                                       "Execute %s failed 
aftert retry %s times", command, i));
+                               } else {
+                                       LOG.warn("Execute {} failed, retry 
times {}", command, i);
+                               }
+                       }
+               }
+       }
+
+       private boolean isHMasterRunning() {
+               try {
+                       final AtomicBoolean atomicHMasterStarted = new 
AtomicBoolean(false);
+                       queryHBaseStatus(line ->
+                                       
atomicHMasterStarted.compareAndSet(false, line.contains("hbase:namespace")));
+                       return atomicHMasterStarted.get();
                } catch (IOException ioe) {
-                       LOG.warn("Error while shutting down hbase.", ioe);
+                       return false;
                }
-               downloadCache.afterTestSuccess();
-               tmp.delete();
        }
 
-       private static boolean isHBaseRunning() {
+       private void queryHBaseStatus(final Consumer<String> stdoutProcessor) 
throws IOException {
+               executeHBaseShell("scan 'hbase:meta'", stdoutProcessor);
+       }
+
+       private boolean isHMasterAlive() {
                try {
                        final AtomicBoolean atomicHMasterStarted = new 
AtomicBoolean(false);
-                       queryHMasterStatus(line -> 
atomicHMasterStarted.compareAndSet(false, line.contains("HMaster")));
+                       queryHBaseProcess(line ->
+                                       
atomicHMasterStarted.compareAndSet(false, line.contains("HMaster")));
                        return atomicHMasterStarted.get();
                } catch (IOException ioe) {
                        return false;
                }
        }
 
-       private static void queryHMasterStatus(final Consumer<String> 
stdoutProcessor) throws IOException {
+       private void queryHBaseProcess(final Consumer<String> stdoutProcessor) 
throws IOException {
                AutoClosableProcess
-                       .create("jps")
-                       .setStdoutProcessor(stdoutProcessor)
-                       .runBlocking();
+                               .create("jps")
+                               .setStdoutProcessor(stdoutProcessor)
+                               .runBlocking();
        }
 
        @Override

Reply via email to