This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 8c2059c78130f8138d6295bef27d76ac5a413aaf
Author: Hunter Lee <[email protected]>
AuthorDate: Mon Jul 27 18:36:39 2020 -0700

    Implement throttling for routing data update on cache miss
    
    This commit implements throttling for routing data update by using a 
timestamp for last time the cache was reset in RoutingDataManager. It defines a 
default interval (5 seconds) but makes this interval configurable by way of 
System Properties config.
---
 ...PropertyKeys.java => RoutingDataConstants.java} | 14 ++---
 .../constant/RoutingSystemPropertyKeys.java        |  5 ++
 .../zookeeper/impl/client/FederatedZkClient.java   | 38 ++++++++++++
 .../zookeeper/routing/RoutingDataManager.java      | 12 ++++
 .../impl/client/TestFederatedZkClient.java         | 70 +++++++++++++++++++++-
 5 files changed, 128 insertions(+), 11 deletions(-)

diff --git 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java
 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingDataConstants.java
similarity index 66%
copy from 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java
copy to 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingDataConstants.java
index a57075b..164c543 100644
--- 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java
+++ 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingDataConstants.java
@@ -19,15 +19,13 @@ package org.apache.helix.zookeeper.constant;
  * under the License.
  */
 
-/**
- * This class contains various routing-related system property keys for 
multi-zk clients.
- */
-public class RoutingSystemPropertyKeys {
+public class RoutingDataConstants {
 
   /**
-   * If enabled, FederatedZkClient (multiZkClient) will invalidate the cached 
routing data and
-   * re-read the routing data from the routing data source upon ZK path 
sharding key cache miss.
+   * Default interval that defines how frequently RoutingDataManager's routing 
data should be
+   * updated from the routing data source. This exists to apply throttling to 
the rate at which
+   * the ZkClient pulls routing data from the routing data source to avoid 
overloading the routing
+   * data source.
    */
-  public static final String UPDATE_ROUTING_DATA_ON_CACHE_MISS =
-      "update.routing.data.on.cache.miss.enabled";
+  public static final long DEFAULT_ROUTING_DATA_UPDATE_INTERVAL_MS = 5 * 
1000L; // 5 seconds
 }
diff --git 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java
 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java
index a57075b..e22ad08 100644
--- 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java
+++ 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java
@@ -30,4 +30,9 @@ public class RoutingSystemPropertyKeys {
    */
   public static final String UPDATE_ROUTING_DATA_ON_CACHE_MISS =
       "update.routing.data.on.cache.miss.enabled";
+
+  /**
+   * The interval to use between routing data updates from the routing data 
source.
+   */
+  public static final String ROUTING_DATA_UPDATE_INTERVAL_MS = 
"routing.data.update.interval.ms";
 }
diff --git 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
index 4354537..dc55d53 100644
--- 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
+++ 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
@@ -30,6 +30,7 @@ import 
org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
 import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.constant.RoutingDataConstants;
 import org.apache.helix.zookeeper.constant.RoutingSystemPropertyKeys;
 import org.apache.helix.zookeeper.exception.MultiZkException;
 import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
@@ -86,6 +87,7 @@ public class FederatedZkClient implements RealmAwareZkClient {
   private PathBasedZkSerializer _pathBasedZkSerializer;
   private final boolean _routingDataUpdateOnCacheMissEnabled = 
Boolean.parseBoolean(
       
System.getProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS));
+  private long _routingDataUpdateInterval;
 
   // TODO: support capacity of ZkClient number in one FederatedZkClient and do 
garbage collection.
   public FederatedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig 
connectionConfig,
@@ -102,6 +104,7 @@ public class FederatedZkClient implements 
RealmAwareZkClient {
     _clientConfig = clientConfig;
     _pathBasedZkSerializer = clientConfig.getZkSerializer();
     _zkRealmToZkClientMap = new ConcurrentHashMap<>();
+    getRoutingDataUpdateInterval();
   }
 
   @Override
@@ -587,6 +590,11 @@ public class FederatedZkClient implements 
RealmAwareZkClient {
               try {
                 zkRealm = 
_metadataStoreRoutingData.getMetadataStoreRealm(path);
               } catch (NoSuchElementException e4) {
+                if (shouldThrottleRead()) {
+                  // If routing data update from routing data source has taken 
place recently,
+                  // then just skip the update and throw the exception
+                  throw e4;
+                }
                 // Try 2) Reset RoutingDataManager and re-read the routing 
data from routing data
                 // source via I/O. Since RoutingDataManager's cache doesn't 
have it either, so we
                 // synchronize on all threads by locking on 
FederatedZkClient.class.
@@ -626,4 +634,34 @@ public class FederatedZkClient implements 
RealmAwareZkClient {
             + ". Instead, please use " + DEDICATED_ZK_CLIENT_FACTORY
             + " to create a dedicated RealmAwareZkClient for this operation.");
   }
+
+  /**
+   * Resolves the routing data update interval value from System Properties.
+   */
+  private void getRoutingDataUpdateInterval() {
+    try {
+      _routingDataUpdateInterval = Long.parseLong(
+          
System.getProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS));
+      if (_routingDataUpdateInterval < 0) {
+        LOG.warn("FederatedZkClient::shouldThrottleRead(): invalid value: {} 
given for "
+                + "ROUTING_DATA_UPDATE_INTERVAL_MS, using the default value (5 
sec) instead!",
+            _routingDataUpdateInterval);
+        _routingDataUpdateInterval = 
RoutingDataConstants.DEFAULT_ROUTING_DATA_UPDATE_INTERVAL_MS;
+      }
+    } catch (NumberFormatException e) {
+      LOG.warn("FederatedZkClient::shouldThrottleRead(): failed to parse "
+          + "ROUTING_DATA_UPDATE_INTERVAL_MS, using the default value (5 sec) 
instead!", e);
+      _routingDataUpdateInterval = 
RoutingDataConstants.DEFAULT_ROUTING_DATA_UPDATE_INTERVAL_MS;
+    }
+  }
+
+  /**
+   * Return whether the read request to routing data source should be 
throttled using the default
+   * routing data update interval.
+   * @return
+   */
+  private boolean shouldThrottleRead() {
+    return System.currentTimeMillis() - 
RoutingDataManager.getInstance().getLastResetTimestamp()
+        < _routingDataUpdateInterval;
+  }
 }
diff --git 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/routing/RoutingDataManager.java
 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/routing/RoutingDataManager.java
index 6df9616..853bd5c 100644
--- 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/routing/RoutingDataManager.java
+++ 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/routing/RoutingDataManager.java
@@ -51,6 +51,9 @@ public class RoutingDataManager {
   private final Map<String, MetadataStoreRoutingData> 
_metadataStoreRoutingDataMap =
       new ConcurrentHashMap<>();
 
+  // Tracks the time at which reset() was called last. Used to throttle reset()
+  private volatile long _lastResetTimestamp;
+
   // Singleton instance
   private static RoutingDataManager _instance;
 
@@ -164,6 +167,15 @@ public class RoutingDataManager {
     _metadataStoreRoutingDataMap.clear();
     _defaultMsdsEndpoint =
         
System.getProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY);
+    _lastResetTimestamp = System.currentTimeMillis();
+  }
+
+  /**
+   * Returns the timestamp for the last reset().
+   * @return
+   */
+  public long getLastResetTimestamp() {
+    return _lastResetTimestamp;
   }
 
   /**
diff --git 
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java
 
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java
index 93e5892..e201905 100644
--- 
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java
+++ 
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java
@@ -279,6 +279,8 @@ public class TestFederatedZkClient extends 
RealmAwareZkClientTestBase {
       throws IOException, InvalidRoutingDataException {
     // Enable routing data update upon cache miss
     
System.setProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS, 
"true");
+    // Set the routing data update interval to 0 so there's no delay in testing
+    
System.setProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS, 
"0");
 
     RoutingDataManager.getInstance().getMetadataStoreRoutingData();
     _msdsServer.stopServer();
@@ -375,7 +377,8 @@ public class TestFederatedZkClient extends 
RealmAwareZkClientTestBase {
     // Shut down MSDS
     _msdsServer.stopServer();
     // Disable System property
-    
System.setProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS, 
"false");
+    
System.clearProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS);
+    
System.clearProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS);
   }
 
   /**
@@ -402,6 +405,8 @@ public class TestFederatedZkClient extends 
RealmAwareZkClientTestBase {
 
     // Enable routing data update upon cache miss
     
System.setProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS, 
"true");
+    // Set the routing data update interval to 0 so there's no delay in testing
+    
System.setProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS, 
"0");
 
     RoutingDataManager.getInstance().reset();
     
RoutingDataManager.getInstance().getMetadataStoreRoutingData(RoutingDataReaderType.ZK,
 zkRealm);
@@ -496,7 +501,66 @@ public class TestFederatedZkClient extends 
RealmAwareZkClientTestBase {
     
zkClient.deleteRecursively(MetadataStoreRoutingConstants.ROUTING_DATA_PATH);
     zkClient.close();
     // Disable System property
-    
System.setProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS, 
"false");
+    
System.clearProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS);
+    
System.clearProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS);
+  }
+
+  /**
+   * Test that throttle based on last reset timestamp works correctly. Here, 
we use ZK as the
+   * routing data source.
+   * Test scenario: set the throttle value to a high value and check that 
routing data update from
+   * the routing data source does NOT happen (because it would be throttled).
+   */
+  @Test(dependsOnMethods = "testUpdateRoutingDataOnCacheMissZK")
+  public void testRoutingDataUpdateThrottle() throws 
InvalidRoutingDataException {
+    // Call reset to set the last reset() timestamp in RoutingDataManager
+    RoutingDataManager.getInstance().reset();
+
+    // Set up routing data in ZK with empty sharding key list
+    String zkRealm = "localhost:2127";
+    String newShardingKey = "/throttle";
+    ZkClient zkClient =
+        new ZkClient.Builder().setZkServer(zkRealm).setZkSerializer(new 
ZNRecordSerializer())
+            .build();
+    zkClient.create(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, null, 
CreateMode.PERSISTENT);
+    ZNRecord zkRealmRecord = new ZNRecord(zkRealm);
+    
zkRealmRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY,
+        new ArrayList<>(TestConstants.TEST_KEY_LIST_1));
+    zkClient.create(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + 
zkRealm, zkRealmRecord,
+        CreateMode.PERSISTENT);
+
+    // Enable routing data update upon cache miss
+    
System.setProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS, 
"true");
+    // Set the throttle value to a very long value
+    
System.setProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS,
+        String.valueOf(Integer.MAX_VALUE));
+
+    // Create a new FederatedZkClient, whose _routingDataUpdateInterval should 
be MAX_VALUE
+    FederatedZkClient federatedZkClient = new FederatedZkClient(
+        new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
+            .setRoutingDataSourceType(RoutingDataReaderType.ZK.name())
+            .setRoutingDataSourceEndpoint(zkRealm).build(),
+        new RealmAwareZkClient.RealmAwareZkClientConfig());
+
+    // Add newShardingKey to ZK's routing data
+    
zkRealmRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY)
+        .add(newShardingKey);
+    zkClient
+        .writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + 
zkRealm, zkRealmRecord);
+
+    try {
+      Assert.assertFalse(federatedZkClient.exists(newShardingKey));
+      Assert.fail("NoSuchElementException expected!");
+    } catch (NoSuchElementException e) {
+      // Expected because it should not read from the routing data source 
because of the throttle
+    }
+
+    // Clean up
+    
zkClient.deleteRecursively(MetadataStoreRoutingConstants.ROUTING_DATA_PATH);
+    zkClient.close();
+    federatedZkClient.close();
+    
System.clearProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS);
+    
System.clearProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS);
   }
 
   /*
@@ -504,7 +568,7 @@ public class TestFederatedZkClient extends 
RealmAwareZkClientTestBase {
    * TODO: test that all raw zkClients are closed after FederatedZkClient 
close() is called. This
    *  could help avoid ZkClient leakage.
    */
-  @Test(dependsOnMethods = "testUpdateRoutingDataOnCacheMissZK")
+  @Test(dependsOnMethods = "testRoutingDataUpdateThrottle")
   public void testClose() {
     Assert.assertFalse(_realmAwareZkClient.isClosed());
 

Reply via email to