HDFS-13198. RBF: RouterHeartbeatService throws out CachedStateStore related 
exceptions when starting router. Contributed by Wei Yan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/41fc7f80
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/41fc7f80
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/41fc7f80

Branch: refs/heads/HDFS-7240
Commit: 41fc7f80beb32f4a1fac73faf7c9116ad5ee3420
Parents: 2626ec3
Author: Inigo Goiri <inigo...@apache.org>
Authored: Wed Mar 14 13:14:36 2018 -0700
Committer: Inigo Goiri <inigo...@apache.org>
Committed: Wed Mar 14 13:14:36 2018 -0700

----------------------------------------------------------------------
 .../router/RouterHeartbeatService.java          |  18 ++-
 .../driver/impl/StateStoreZooKeeperImpl.java    |  11 +-
 .../router/TestRouterHeartbeatService.java      | 147 +++++++++++++++++++
 3 files changed, 172 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/41fc7f80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java
index 86a6210..6e44984 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.store.CachedRecordStore;
@@ -75,14 +76,15 @@ public class RouterHeartbeatService extends PeriodicService 
{
   /**
    * Update the state of the Router in the State Store.
    */
-  private synchronized void updateStateStore() {
+  @VisibleForTesting
+  synchronized void updateStateStore() {
     String routerId = router.getRouterId();
     if (routerId == null) {
       LOG.error("Cannot heartbeat for router: unknown router id");
       return;
     }
-    RouterStore routerStore = router.getRouterStateManager();
-    if (routerStore != null) {
+    if (isStoreAvailable()) {
+      RouterStore routerStore = router.getRouterStateManager();
       try {
         RouterState record = RouterState.newInstance(
             routerId, router.getStartTime(), router.getRouterState());
@@ -152,4 +154,14 @@ public class RouterHeartbeatService extends 
PeriodicService {
   public void periodicInvoke() {
     updateStateStore();
   }
+
+  private boolean isStoreAvailable() {
+    if (router.getRouterStateManager() == null) {
+      return false;
+    }
+    if (router.getStateStore() == null) {
+      return false;
+    }
+    return router.getStateStore().isDriverReady();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/41fc7f80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
index 69b9b98..7f98171 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
@@ -26,6 +26,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
@@ -111,7 +113,14 @@ public class StateStoreZooKeeperImpl extends 
StateStoreSerializableImpl {
 
   @Override
   public boolean isDriverReady() {
-    return zkManager != null;
+    if (zkManager == null) {
+      return false;
+    }
+    CuratorFramework curator = zkManager.getCurator();
+    if (curator == null) {
+      return false;
+    }
+    return curator.getState() == CuratorFrameworkState.STARTED;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/41fc7f80/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterHeartbeatService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterHeartbeatService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterHeartbeatService.java
new file mode 100644
index 0000000..427d514
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterHeartbeatService.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import 
org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
+import 
org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.waitStateStore;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for router heartbeat service.
+ */
+public class TestRouterHeartbeatService {
+  private Router router;
+  private final String routerId = "router1";
+  private TestingServer testingServer;
+  private CuratorFramework curatorFramework;
+
+  @Before
+  public void setup() throws Exception {
+    router = new Router();
+    router.setRouterId(routerId);
+    Configuration conf = new Configuration();
+    conf.setInt(DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, 1);
+    Configuration routerConfig =
+        new RouterConfigBuilder(conf).stateStore().build();
+    routerConfig.setLong(DFSConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS,
+        TimeUnit.HOURS.toMillis(1));
+    routerConfig.setClass(DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS,
+        StateStoreZooKeeperImpl.class, StateStoreDriver.class);
+
+    testingServer = new TestingServer();
+    String connectStr = testingServer.getConnectString();
+    curatorFramework = CuratorFrameworkFactory.builder()
+        .connectString(connectStr)
+        .retryPolicy(new RetryNTimes(100, 100))
+        .build();
+    curatorFramework.start();
+    routerConfig.set(CommonConfigurationKeys.ZK_ADDRESS, connectStr);
+    router.init(routerConfig);
+    router.start();
+
+
+    waitStateStore(router.getStateStore(), TimeUnit.SECONDS.toMicros(10));
+  }
+
+  @Test
+  public void testStateStoreUnavailable() throws IOException {
+    curatorFramework.close();
+    testingServer.stop();
+    router.getStateStore().stop();
+    // The driver is not ready
+    assertFalse(router.getStateStore().isDriverReady());
+
+    // Do a heartbeat, and no exception thrown out
+    RouterHeartbeatService heartbeatService =
+        new RouterHeartbeatService(router);
+    heartbeatService.updateStateStore();
+  }
+
+  @Test
+  public void testStateStoreAvailable() throws Exception {
+    // The driver is ready
+    StateStoreService stateStore = router.getStateStore();
+    assertTrue(router.getStateStore().isDriverReady());
+    RouterStore routerStore = router.getRouterStateManager();
+
+    // No record about this router
+    stateStore.refreshCaches(true);
+    GetRouterRegistrationRequest request =
+        GetRouterRegistrationRequest.newInstance(routerId);
+    GetRouterRegistrationResponse response =
+        router.getRouterStateManager().getRouterRegistration(request);
+    RouterState routerState = response.getRouter();
+    String id = routerState.getRouterId();
+    StateStoreVersion version = routerState.getStateStoreVersion();
+    assertNull(id);
+    assertNull(version);
+
+    // Do a heartbeat
+    RouterHeartbeatService heartbeatService =
+        new RouterHeartbeatService(router);
+    heartbeatService.updateStateStore();
+
+    // We should have a record
+    stateStore.refreshCaches(true);
+    request = GetRouterRegistrationRequest.newInstance(routerId);
+    response = routerStore.getRouterRegistration(request);
+    routerState = response.getRouter();
+    id = routerState.getRouterId();
+    version = routerState.getStateStoreVersion();
+    assertNotNull(id);
+    assertNotNull(version);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (curatorFramework != null) {
+      curatorFramework.close();
+    }
+    if (testingServer != null) {
+      testingServer.stop();
+    }
+    if (router != null) {
+      router.shutDown();
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to