This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch HBASE-29368-key-management-feature
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to 
refs/heads/HBASE-29368-key-management-feature by this push:
     new 020adaa0643 HBASE-29643: Admin API to trigger for System Key rotation 
(#7394)
020adaa0643 is described below

commit 020adaa0643a7564939a09e1701c62cfe849300a
Author: Hari Krishna Dara <[email protected]>
AuthorDate: Mon Oct 27 22:56:22 2025 +0530

    HBASE-29643: Admin API to trigger for System Key rotation (#7394)
---
 .rubocop.yml                                       |   9 +
 .../java/org/apache/hadoop/hbase/client/Admin.java |   6 +
 .../hadoop/hbase/client/AdminOverAsyncAdmin.java   |   5 +
 .../org/apache/hadoop/hbase/client/AsyncAdmin.java |   6 +
 .../hadoop/hbase/client/AsyncHBaseAdmin.java       |   5 +
 .../hadoop/hbase/client/RawAsyncHBaseAdmin.java    |  25 +++
 .../hadoop/hbase/keymeta/KeymetaAdminClient.java   |  49 +++--
 .../apache/hadoop/hbase/io/crypto/Encryption.java  |  13 +-
 .../apache/hadoop/hbase/keymeta/KeymetaAdmin.java  |  16 +-
 .../hbase/io/crypto/MockManagedKeyProvider.java    |  17 ++
 .../src/main/protobuf/server/ManagedKeys.proto     |  24 ++-
 .../src/main/protobuf/server/region/Admin.proto    |   3 +
 .../org/apache/hadoop/hbase/HBaseServerBase.java   |  13 +-
 .../hbase/client/AsyncRegionServerAdmin.java       |   5 +
 .../hadoop/hbase/keymeta/KeyManagementBase.java    |   8 +-
 .../hadoop/hbase/keymeta/KeymetaAdminImpl.java     |  66 +++++--
 .../hbase/keymeta/KeymetaServiceEndpoint.java      | 123 +++++++-----
 .../org/apache/hadoop/hbase/master/HMaster.java    |  12 ++
 .../hadoop/hbase/master/MasterRpcServices.java     |   7 +
 .../apache/hadoop/hbase/master/MasterServices.java |   3 +
 .../hadoop/hbase/regionserver/RSRpcServices.java   |  24 +++
 .../apache/hadoop/hbase/security/SecurityUtil.java |  22 +--
 .../hadoop/hbase/keymeta/TestKeymetaEndpoint.java  | 160 ++++++---------
 .../hadoop/hbase/keymeta/TestManagedKeymeta.java   | 186 +++++++++++++----
 .../hbase/master/MockNoopMasterServices.java       |   5 +
 .../hadoop/hbase/master/MockRegionServer.java      |   7 +
 .../hadoop/hbase/master/TestKeymetaAdminImpl.java  | 214 +++++++++++++++++---
 .../hbase/regionserver/TestRSRpcServices.java      | 131 ++++++++++++
 .../hbase/rsgroup/VerifyingRSGroupAdmin.java       |   5 +
 hbase-shell/src/main/ruby/hbase/keymeta_admin.rb   |  17 +-
 hbase-shell/src/main/ruby/shell.rb                 |   3 +-
 .../ruby/shell/commands/enable_key_management.rb   |   2 +-
 .../{enable_key_management.rb => rotate_stk.rb}    |  27 ++-
 .../hbase/client/TestKeymetaMockProviderShell.java |  83 ++++++++
 .../src/test/ruby/shell/admin_keymeta_test.rb      |  21 +-
 .../ruby/shell/encrypted_table_keymeta_test.rb     |  25 +--
 .../shell/key_provider_keymeta_migration_test.rb   | 219 ++++++++++++---------
 .../shell/rotate_stk_keymeta_mock_provider_test.rb |  59 ++++++
 .../hadoop/hbase/thrift2/client/ThriftAdmin.java   |   6 +
 39 files changed, 1224 insertions(+), 407 deletions(-)

diff --git a/.rubocop.yml b/.rubocop.yml
index f877a052eea..e1eb10a9245 100644
--- a/.rubocop.yml
+++ b/.rubocop.yml
@@ -9,3 +9,12 @@ Layout/LineLength:
 
 Metrics/MethodLength:
   Max: 75
+
+GlobalVars:
+  AllowedVariables:
+    - $CUST1_ENCODED
+    - $CUST1_ALIAS
+    - $CUST1_ENCODED
+    - $GLOB_CUST_ENCODED
+    - $TEST
+    - $TEST_CLUSTER
\ No newline at end of file
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
index 1c08ec3b26f..078ac599747 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
@@ -2664,4 +2664,10 @@ public interface Admin extends Abortable, Closeable {
 
   @InterfaceAudience.Private
   void restoreBackupSystemTable(String snapshotName) throws IOException;
+
+  /**
+   * Refresh the system key cache on all specified region servers.
+   * @param regionServers the list of region servers to refresh the system key 
cache on
+   */
+  void refreshSystemKeyCacheOnAllServers(Set<ServerName> regionServers) throws 
IOException;
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java
index e6bf6c3d28e..5ae99b00a7e 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java
@@ -1146,4 +1146,9 @@ class AdminOverAsyncAdmin implements Admin {
   public void restoreBackupSystemTable(String snapshotName) throws IOException 
{
     get(admin.restoreBackupSystemTable(snapshotName));
   }
+
+  @Override
+  public void refreshSystemKeyCacheOnAllServers(Set<ServerName> regionServers) 
throws IOException {
+    get(admin.refreshSystemKeyCacheOnAllServers(regionServers));
+  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index ec0556f20ac..a10746f2726 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -1874,4 +1874,10 @@ public interface AsyncAdmin {
 
   @InterfaceAudience.Private
   CompletableFuture<Void> restoreBackupSystemTable(String snapshotName);
+
+  /**
+   * Refresh the system key cache on all specified region servers.
+   * @param regionServers the list of region servers to refresh the system key 
cache on
+   */
+  CompletableFuture<Void> refreshSystemKeyCacheOnAllServers(Set<ServerName> 
regionServers);
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index b1fb2be1354..d135063ec9a 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -686,6 +686,11 @@ class AsyncHBaseAdmin implements AsyncAdmin {
     return wrap(rawAdmin.updateConfiguration(groupName));
   }
 
+  @Override
+  public CompletableFuture<Void> 
refreshSystemKeyCacheOnAllServers(Set<ServerName> regionServers) {
+    return wrap(rawAdmin.refreshSystemKeyCacheOnAllServers(regionServers));
+  }
+
   @Override
   public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
     return wrap(rawAdmin.rollWALWriter(serverName));
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 710c8c43038..8a5033ff9b1 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -150,6 +150,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerR
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.EmptyMsg;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.LastHighestWalFilenum;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
@@ -4662,4 +4663,28 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
         MasterProtos.RestoreBackupSystemTableResponse::getProcId,
         new RestoreBackupSystemTableProcedureBiConsumer());
   }
+
+  @Override
+  public CompletableFuture<Void> 
refreshSystemKeyCacheOnAllServers(Set<ServerName> regionServers) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    List<CompletableFuture<Void>> futures =
+      
regionServers.stream().map(this::refreshSystemKeyCache).collect(Collectors.toList());
+    addListener(CompletableFuture.allOf(futures.toArray(new 
CompletableFuture<?>[0])),
+      (result, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+        } else {
+          future.complete(result);
+        }
+      });
+    return future;
+  }
+
+  private CompletableFuture<Void> refreshSystemKeyCache(ServerName serverName) 
{
+    return this.<Void> newAdminCaller()
+      .action((controller, stub) -> this.<EmptyMsg, EmptyMsg, Void> 
adminCall(controller, stub,
+        EmptyMsg.getDefaultInstance(),
+        (s, c, req, done) -> s.refreshSystemKeyCache(controller, req, done), 
resp -> null))
+      .serverName(serverName).call();
+  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/keymeta/KeymetaAdminClient.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/keymeta/KeymetaAdminClient.java
index e72e3c978ad..01a5574443d 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/keymeta/KeymetaAdminClient.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/keymeta/KeymetaAdminClient.java
@@ -25,19 +25,18 @@ import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.io.crypto.ManagedKeyData;
 import org.apache.hadoop.hbase.io.crypto.ManagedKeyState;
 import org.apache.hadoop.hbase.protobuf.generated.ManagedKeysProtos;
-import 
org.apache.hadoop.hbase.protobuf.generated.ManagedKeysProtos.ManagedKeysRequest;
-import 
org.apache.hadoop.hbase.protobuf.generated.ManagedKeysProtos.ManagedKeysResponse;
+import 
org.apache.hadoop.hbase.protobuf.generated.ManagedKeysProtos.ManagedKeyRequest;
+import 
org.apache.hadoop.hbase.protobuf.generated.ManagedKeysProtos.ManagedKeyResponse;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.EmptyMsg;
 
 @InterfaceAudience.Public
 public class KeymetaAdminClient implements KeymetaAdmin {
-  private static final Logger LOG = 
LoggerFactory.getLogger(KeymetaAdminClient.class);
   private ManagedKeysProtos.ManagedKeysService.BlockingInterface stub;
 
   public KeymetaAdminClient(Connection conn) throws IOException {
@@ -46,38 +45,54 @@ public class KeymetaAdminClient implements KeymetaAdmin {
   }
 
   @Override
-  public List<ManagedKeyData> enableKeyManagement(String keyCust, String 
keyNamespace)
+  public ManagedKeyData enableKeyManagement(byte[] keyCust, String 
keyNamespace)
     throws IOException {
     try {
-      ManagedKeysProtos.GetManagedKeysResponse response = 
stub.enableKeyManagement(null,
-        
ManagedKeysRequest.newBuilder().setKeyCust(keyCust).setKeyNamespace(keyNamespace).build());
-      return generateKeyDataList(response);
+      ManagedKeysProtos.ManagedKeyResponse response =
+        stub.enableKeyManagement(null, ManagedKeyRequest.newBuilder()
+          
.setKeyCust(ByteString.copyFrom(keyCust)).setKeyNamespace(keyNamespace).build());
+      return generateKeyData(response);
     } catch (ServiceException e) {
       throw ProtobufUtil.handleRemoteException(e);
     }
   }
 
   @Override
-  public List<ManagedKeyData> getManagedKeys(String keyCust, String 
keyNamespace)
+  public List<ManagedKeyData> getManagedKeys(byte[] keyCust, String 
keyNamespace)
     throws IOException, KeyException {
     try {
-      ManagedKeysProtos.GetManagedKeysResponse statusResponse = 
stub.getManagedKeys(null,
-        
ManagedKeysRequest.newBuilder().setKeyCust(keyCust).setKeyNamespace(keyNamespace).build());
+      ManagedKeysProtos.GetManagedKeysResponse statusResponse =
+        stub.getManagedKeys(null, ManagedKeyRequest.newBuilder()
+          
.setKeyCust(ByteString.copyFrom(keyCust)).setKeyNamespace(keyNamespace).build());
       return generateKeyDataList(statusResponse);
     } catch (ServiceException e) {
       throw ProtobufUtil.handleRemoteException(e);
     }
   }
 
+  @Override
+  public boolean rotateSTK() throws IOException {
+    try {
+      ManagedKeysProtos.RotateSTKResponse response =
+        stub.rotateSTK(null, EmptyMsg.getDefaultInstance());
+      return response.getRotated();
+    } catch (ServiceException e) {
+      throw ProtobufUtil.handleRemoteException(e);
+    }
+  }
+
   private static List<ManagedKeyData>
     generateKeyDataList(ManagedKeysProtos.GetManagedKeysResponse 
stateResponse) {
     List<ManagedKeyData> keyStates = new ArrayList<>();
-    for (ManagedKeysResponse state : stateResponse.getStateList()) {
-      keyStates
-        .add(new ManagedKeyData(state.getKeyCustBytes().toByteArray(), 
state.getKeyNamespace(),
-          null, ManagedKeyState.forValue((byte) 
state.getKeyState().getNumber()),
-          state.getKeyMetadata(), state.getRefreshTimestamp()));
+    for (ManagedKeyResponse state : stateResponse.getStateList()) {
+      keyStates.add(generateKeyData(state));
     }
     return keyStates;
   }
+
+  private static ManagedKeyData 
generateKeyData(ManagedKeysProtos.ManagedKeyResponse response) {
+    return new ManagedKeyData(response.getKeyCust().toByteArray(), 
response.getKeyNamespace(), null,
+      ManagedKeyState.forValue((byte) response.getKeyState().getNumber()),
+      response.getKeyMetadata(), response.getRefreshTimestamp());
+  }
 }
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/Encryption.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/Encryption.java
index e8d965adebb..56a6ad21173 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/Encryption.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/Encryption.java
@@ -506,12 +506,11 @@ public final class Encryption {
       // is configured
       String alternateAlgorithm = 
conf.get(HConstants.CRYPTO_ALTERNATE_KEY_ALGORITHM_CONF_KEY);
       if (alternateAlgorithm != null) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Unable to decrypt data with current cipher algorithm '"
-            + conf.get(HConstants.CRYPTO_KEY_ALGORITHM_CONF_KEY, 
HConstants.CIPHER_AES)
-            + "'. Trying with the alternate cipher algorithm '" + 
alternateAlgorithm
-            + "' configured.");
-        }
+        LOG.debug(
+          "Unable to decrypt data with current cipher algorithm '{}'. "
+            + "Trying with the alternate cipher algorithm '{}' configured.",
+          conf.get(HConstants.CRYPTO_KEY_ALGORITHM_CONF_KEY, 
HConstants.CIPHER_AES),
+          alternateAlgorithm);
         Cipher alterCipher = Encryption.getCipher(conf, alternateAlgorithm);
         if (alterCipher == null) {
           throw new RuntimeException("Cipher '" + alternateAlgorithm + "' not 
available");
@@ -575,7 +574,7 @@ public final class Encryption {
         throw new RuntimeException(e);
       }
       keyProviderCache.put(providerCacheKey, provider);
-      LOG.debug("Installed " + providerClassName + " into key provider cache");
+      LOG.debug("Installed {} into key provider cache", providerClassName);
     }
     return provider;
   }
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/keymeta/KeymetaAdmin.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/keymeta/KeymetaAdmin.java
index be4f36d8802..4bf79090c3b 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/keymeta/KeymetaAdmin.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/keymeta/KeymetaAdmin.java
@@ -31,23 +31,31 @@ import org.apache.yetus.audience.InterfaceAudience;
 public interface KeymetaAdmin {
   /**
    * Enables key management for the specified custodian and namespace.
-   * @param keyCust      The key custodian in base64 encoded format.
+   * @param keyCust      The key custodian identifier.
    * @param keyNamespace The namespace for the key management.
    * @return The list of {@link ManagedKeyData} objects each identifying the 
key and its current
    *         status.
    * @throws IOException if an error occurs while enabling key management.
    */
-  List<ManagedKeyData> enableKeyManagement(String keyCust, String keyNamespace)
+  ManagedKeyData enableKeyManagement(byte[] keyCust, String keyNamespace)
     throws IOException, KeyException;
 
   /**
    * Get the status of all the keys for the specified custodian.
-   * @param keyCust      The key custodian in base64 encoded format.
+   * @param keyCust      The key custodian identifier.
    * @param keyNamespace The namespace for the key management.
    * @return The list of {@link ManagedKeyData} objects each identifying the 
key and its current
    *         status.
    * @throws IOException if an error occurs while enabling key management.
    */
-  List<ManagedKeyData> getManagedKeys(String keyCust, String keyNamespace)
+  List<ManagedKeyData> getManagedKeys(byte[] keyCust, String keyNamespace)
     throws IOException, KeyException;
+
+  /**
+   * Triggers rotation of the System Key (STK) by checking for a new key and 
propagating it to all
+   * region servers.
+   * @return true if a new STK was found and rotated, false if no change was 
detected
+   * @throws IOException if an error occurs while rotating the STK
+   */
+  boolean rotateSTK() throws IOException;
 }
diff --git 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/MockManagedKeyProvider.java
 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/MockManagedKeyProvider.java
index 6782a7d1163..a4c0d683347 100644
--- 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/MockManagedKeyProvider.java
+++ 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/MockManagedKeyProvider.java
@@ -44,6 +44,9 @@ public class MockManagedKeyProvider extends 
MockAesKeyProvider implements Manage
   private Map<String, ManagedKeyState> keyState = new HashMap<>();
   private String systemKeyAlias = "default_system_key_alias";
 
+  private boolean shouldThrowExceptionOnGetSystemKey = false;
+  private boolean shouldThrowExceptionOnGetManagedKey = false;
+
   @Override
   public void initConfig(Configuration conf, String providerParameters) {
     super.init(providerParameters);
@@ -51,11 +54,17 @@ public class MockManagedKeyProvider extends 
MockAesKeyProvider implements Manage
 
   @Override
   public ManagedKeyData getSystemKey(byte[] systemId) throws IOException {
+    if (shouldThrowExceptionOnGetSystemKey) {
+      throw new IOException("Test exception on getSystemKey");
+    }
     return getKey(systemId, systemKeyAlias, ManagedKeyData.KEY_SPACE_GLOBAL);
   }
 
   @Override
   public ManagedKeyData getManagedKey(byte[] key_cust, String key_namespace) 
throws IOException {
+    if (shouldThrowExceptionOnGetManagedKey) {
+      throw new IOException("Test exception on getManagedKey");
+    }
     String alias = Bytes.toString(key_cust);
     return getKey(key_cust, alias, key_namespace);
   }
@@ -118,6 +127,14 @@ public class MockManagedKeyProvider extends 
MockAesKeyProvider implements Manage
     return this.systemKeyAlias;
   }
 
+  public void setShouldThrowExceptionOnGetSystemKey(boolean 
shouldThrowExceptionOnGetSystemKey) {
+    this.shouldThrowExceptionOnGetSystemKey = 
shouldThrowExceptionOnGetSystemKey;
+  }
+
+  public void setShouldThrowExceptionOnGetManagedKey(boolean 
shouldThrowExceptionOnGetManagedKey) {
+    this.shouldThrowExceptionOnGetManagedKey = 
shouldThrowExceptionOnGetManagedKey;
+  }
+
   /**
    * Generate a new secret key.
    * @return the key
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/ManagedKeys.proto 
b/hbase-protocol-shaded/src/main/protobuf/server/ManagedKeys.proto
index c6a3a31f618..8e633fc25ba 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/ManagedKeys.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/ManagedKeys.proto
@@ -24,8 +24,10 @@ option java_generic_services = true;
 option java_generate_equals_and_hash = true;
 option optimize_for = SPEED;
 
-message ManagedKeysRequest {
-  required string key_cust = 1;
+import "HBase.proto";
+
+message ManagedKeyRequest {
+  required bytes key_cust = 1;
   required string key_namespace = 2;
 }
 
@@ -36,8 +38,8 @@ enum ManagedKeyState {
   KEY_DISABLED = 4;
 }
 
-message ManagedKeysResponse {
-  required string key_cust = 1;
+message ManagedKeyResponse {
+  required bytes key_cust = 1;
   required string key_namespace = 2;
   required ManagedKeyState key_state = 3;
   optional string key_metadata = 4;
@@ -45,12 +47,18 @@ message ManagedKeysResponse {
 }
 
 message GetManagedKeysResponse {
-  repeated ManagedKeysResponse state = 1;
+  repeated ManagedKeyResponse state = 1;
+}
+
+message RotateSTKResponse {
+  required bool rotated = 1;
 }
 
 service ManagedKeysService {
-  rpc EnableKeyManagement(ManagedKeysRequest)
-    returns (GetManagedKeysResponse);
-  rpc GetManagedKeys(ManagedKeysRequest)
+  rpc EnableKeyManagement(ManagedKeyRequest)
+    returns (ManagedKeyResponse);
+  rpc GetManagedKeys(ManagedKeyRequest)
     returns (GetManagedKeysResponse);
+  rpc RotateSTK(EmptyMsg)
+    returns (RotateSTKResponse);
 }
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto 
b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
index 30eb328fd3c..31abe9ad417 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
@@ -420,4 +420,7 @@ service AdminService {
 
   rpc GetCachedFilesList(GetCachedFilesListRequest)
     returns(GetCachedFilesListResponse);
+
+  rpc RefreshSystemKeyCache(EmptyMsg)
+    returns(EmptyMsg);
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseServerBase.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseServerBase.java
index 0993fc0f09d..eb1502685fe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseServerBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseServerBase.java
@@ -194,7 +194,7 @@ public abstract class HBaseServerBase<R extends 
HBaseRpcServicesBase<?>> extends
 
   protected final NettyEventLoopGroupConfig eventLoopGroupConfig;
 
-  private SystemKeyCache systemKeyCache;
+  protected SystemKeyCache systemKeyCache;
   protected KeymetaAdminImpl keymetaAdmin;
   protected ManagedKeyDataCache managedKeyDataCache;
 
@@ -437,6 +437,17 @@ public abstract class HBaseServerBase<R extends 
HBaseRpcServicesBase<?>> extends
     }
   }
 
+  /**
+   * Rebuilds the system key cache. This method can be called to refresh the 
system key cache when
+   * the system key has been rotated.
+   * @throws IOException if there is an error rebuilding the cache
+   */
+  public void rebuildSystemKeyCache() throws IOException {
+    if (SecurityUtil.isKeyManagementEnabled(conf)) {
+      systemKeyCache = SystemKeyCache.createCache(new SystemKeyAccessor(this));
+    }
+  }
+
   protected final void shutdownChore(ScheduledChore chore) {
     if (chore != null) {
       chore.shutdown();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
index 81707fe1f16..34e70634498 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
@@ -66,6 +66,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavor
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.EmptyMsg;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
 
@@ -217,4 +218,8 @@ public class AsyncRegionServerAdmin {
     executeProcedures(ExecuteProceduresRequest request) {
     return call((stub, controller, done) -> stub.executeProcedures(controller, 
request, done));
   }
+
+  public CompletableFuture<EmptyMsg> refreshSystemKeyCache(EmptyMsg request) {
+    return call((stub, controller, done) -> 
stub.refreshSystemKeyCache(controller, request, done));
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/keymeta/KeyManagementBase.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/keymeta/KeyManagementBase.java
index 6fbd177437f..e263ccb4fbe 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/keymeta/KeyManagementBase.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/keymeta/KeyManagementBase.java
@@ -134,12 +134,12 @@ public abstract class KeyManagementBase {
     /*
      * Will be useful when refresh API is implemented. if (existingActiveKey 
!= null &&
      * existingActiveKey.equals(pbeKey)) {
-     * LOG.info("retrieveManagedKey: no change in key for (custodian: {}, 
namespace: {}",
-     * encKeyCust, keyNamespace); return null; } // TODO: If existingActiveKey 
is not null, we
-     * should update the key state to INACTIVE.
+     * LOG.info("retrieveActiveKey: no change in key for (custodian: {}, 
namespace: {}", encKeyCust,
+     * keyNamespace); return null; } // TODO: If existingActiveKey is not 
null, we should update the
+     * key state to INACTIVE.
      */
     LOG.info(
-      "retrieveManagedKey: got managed key with status: {} and metadata: {} 
for "
+      "retrieveActiveKey: got active key with status: {} and metadata: {} for "
         + "(custodian: {}, namespace: {})",
       pbeKey.getKeyState(), pbeKey.getKeyMetadata(), encKeyCust, 
pbeKey.getKeyNamespace());
     if (accessor != null) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/keymeta/KeymetaAdminImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/keymeta/KeymetaAdminImpl.java
index 4c16d2b59aa..030a278f023 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/keymeta/KeymetaAdminImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/keymeta/KeymetaAdminImpl.java
@@ -19,11 +19,15 @@ package org.apache.hadoop.hbase.keymeta;
 
 import java.io.IOException;
 import java.security.KeyException;
-import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncAdmin;
 import org.apache.hadoop.hbase.io.crypto.ManagedKeyData;
 import org.apache.hadoop.hbase.io.crypto.ManagedKeyProvider;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,34 +41,70 @@ public class KeymetaAdminImpl extends KeymetaTableAccessor 
implements KeymetaAdm
   }
 
   @Override
-  public List<ManagedKeyData> enableKeyManagement(String keyCust, String 
keyNamespace)
+  public ManagedKeyData enableKeyManagement(byte[] keyCust, String 
keyNamespace)
     throws IOException, KeyException {
     assertKeyManagementEnabled();
-    LOG.info("Trying to enable key management on custodian: {} under 
namespace: {}", keyCust,
+    String encodedCust = ManagedKeyProvider.encodeToStr(keyCust);
+    LOG.info("Trying to enable key management on custodian: {} under 
namespace: {}", encodedCust,
       keyNamespace);
-    byte[] key_cust = ManagedKeyProvider.decodeToBytes(keyCust);
 
     // Check if (cust, namespace) pair is already enabled and has an active 
key.
-    ManagedKeyData activeKey = getActiveKey(key_cust, keyNamespace);
+    ManagedKeyData activeKey = getActiveKey(keyCust, keyNamespace);
     if (activeKey != null) {
       LOG.info(
         "enableManagedKeys: specified (custodian: {}, namespace: {}) already 
has "
           + "an active managed key with metadata: {}",
-        keyCust, keyNamespace, activeKey.getKeyMetadata());
-      return Collections.singletonList(activeKey);
+        encodedCust, keyNamespace, activeKey.getKeyMetadata());
+      return activeKey;
     }
 
     // Retrieve a single key from provider
-    ManagedKeyData retrievedKey = retrieveActiveKey(keyCust, key_cust, 
keyNamespace, this, null);
-    return Collections.singletonList(retrievedKey);
+    ManagedKeyData retrievedKey = retrieveActiveKey(encodedCust, keyCust, 
keyNamespace, this, null);
+    return retrievedKey;
   }
 
   @Override
-  public List<ManagedKeyData> getManagedKeys(String keyCust, String 
keyNamespace)
+  public List<ManagedKeyData> getManagedKeys(byte[] keyCust, String 
keyNamespace)
     throws IOException, KeyException {
     assertKeyManagementEnabled();
-    LOG.info("Getting key statuses for custodian: {} under namespace: {}", 
keyCust, keyNamespace);
-    byte[] key_cust = ManagedKeyProvider.decodeToBytes(keyCust);
-    return getAllKeys(key_cust, keyNamespace);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Getting key statuses for custodian: {} under namespace: {}",
+        ManagedKeyProvider.encodeToStr(keyCust), keyNamespace);
+    }
+    return getAllKeys(keyCust, keyNamespace);
+  }
+
+  @Override
+  public boolean rotateSTK() throws IOException {
+    assertKeyManagementEnabled();
+    if (!(getServer() instanceof MasterServices)) {
+      throw new IOException("rotateSTK can only be called on master");
+    }
+    MasterServices master = (MasterServices) getServer();
+
+    LOG.info("Checking if System Key is rotated");
+    boolean rotated = master.rotateSystemKeyIfChanged();
+
+    if (!rotated) {
+      LOG.info("No change in System Key is detected");
+      return false;
+    }
+
+    Set<ServerName> regionServers = 
master.getServerManager().getOnlineServers().keySet();
+
+    LOG.info("System Key is rotated, initiating cache refresh on all region 
servers");
+    try {
+      
FutureUtils.get(getAsyncAdmin(master).refreshSystemKeyCacheOnAllServers(regionServers));
+    } catch (Exception e) {
+      throw new IOException(
+        "Failed to initiate System Key cache refresh on one or more region 
servers", e);
+    }
+
+    LOG.info("System Key rotation and cache refresh completed successfully");
+    return true;
+  }
+
+  protected AsyncAdmin getAsyncAdmin(MasterServices master) {
+    return master.getAsyncClusterConnection().getAdmin();
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/keymeta/KeymetaServiceEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/keymeta/KeymetaServiceEndpoint.java
index 4eb19a602cc..85ff9ba3feb 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/keymeta/KeymetaServiceEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/keymeta/KeymetaServiceEndpoint.java
@@ -19,10 +19,10 @@ package org.apache.hadoop.hbase.keymeta;
 
 import java.io.IOException;
 import java.security.KeyException;
-import java.util.Base64;
 import java.util.Collections;
 import java.util.List;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.HasMasterServices;
 import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
@@ -31,18 +31,20 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.protobuf.generated.ManagedKeysProtos;
 import 
org.apache.hadoop.hbase.protobuf.generated.ManagedKeysProtos.GetManagedKeysResponse;
-import 
org.apache.hadoop.hbase.protobuf.generated.ManagedKeysProtos.ManagedKeysRequest;
-import 
org.apache.hadoop.hbase.protobuf.generated.ManagedKeysProtos.ManagedKeysResponse;
+import 
org.apache.hadoop.hbase.protobuf.generated.ManagedKeysProtos.ManagedKeyRequest;
+import 
org.apache.hadoop.hbase.protobuf.generated.ManagedKeysProtos.ManagedKeyResponse;
 import 
org.apache.hadoop.hbase.protobuf.generated.ManagedKeysProtos.ManagedKeysService;
+import 
org.apache.hadoop.hbase.protobuf.generated.ManagedKeysProtos.RotateSTKResponse;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
 import org.apache.hbase.thirdparty.com.google.protobuf.Service;
 
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.EmptyMsg;
+
 /**
  * This class implements a coprocessor service endpoint for the key management 
metadata operations.
  * It handles the following methods: This endpoint is designed to work in 
conjunction with the
@@ -100,47 +102,72 @@ public class KeymetaServiceEndpoint implements 
MasterCoprocessor {
      * @param done       The callback to be invoked with the response.
      */
     @Override
-    public void enableKeyManagement(RpcController controller, 
ManagedKeysRequest request,
-      RpcCallback<GetManagedKeysResponse> done) {
-      ManagedKeysResponse.Builder builder = getResponseBuilder(controller, 
request);
-      if (builder.getKeyCust() != null && !builder.getKeyCust().isEmpty()) {
-        try {
-          List<ManagedKeyData> managedKeyStates = master.getKeymetaAdmin()
-            .enableKeyManagement(request.getKeyCust(), 
request.getKeyNamespace());
-          done.run(generateKeyStateResponse(managedKeyStates, builder));
-        } catch (IOException e) {
-          CoprocessorRpcUtils.setControllerException(controller, e);
-        } catch (KeyException e) {
-          CoprocessorRpcUtils.setControllerException(controller, new 
IOException(e));
-        }
+    public void enableKeyManagement(RpcController controller, 
ManagedKeyRequest request,
+      RpcCallback<ManagedKeyResponse> done) {
+      ManagedKeyResponse response = null;
+      ManagedKeyResponse.Builder builder = ManagedKeyResponse.newBuilder();
+      try {
+        initManagedKeyResponseBuilder(controller, request, builder);
+        ManagedKeyData managedKeyState = master.getKeymetaAdmin()
+          .enableKeyManagement(request.getKeyCust().toByteArray(), 
request.getKeyNamespace());
+        response = generateKeyStateResponse(managedKeyState, builder);
+      } catch (IOException | KeyException e) {
+        CoprocessorRpcUtils.setControllerException(controller, new 
DoNotRetryIOException(e));
+        builder.setKeyState(ManagedKeysProtos.ManagedKeyState.KEY_FAILED);
       }
+      if (response == null) {
+        response = builder.build();
+      }
+      done.run(response);
     }
 
     @Override
-    public void getManagedKeys(RpcController controller, ManagedKeysRequest 
request,
+    public void getManagedKeys(RpcController controller, ManagedKeyRequest 
request,
       RpcCallback<GetManagedKeysResponse> done) {
-      ManagedKeysResponse.Builder builder = getResponseBuilder(controller, 
request);
-      if (builder.getKeyCust() != null && !builder.getKeyCust().isEmpty()) {
-        try {
-          List<ManagedKeyData> managedKeyStates = master.getKeymetaAdmin()
-            .getManagedKeys(request.getKeyCust(), request.getKeyNamespace());
-          done.run(generateKeyStateResponse(managedKeyStates, builder));
-        } catch (IOException e) {
-          CoprocessorRpcUtils.setControllerException(controller, e);
-        } catch (KeyException e) {
-          CoprocessorRpcUtils.setControllerException(controller, new 
IOException(e));
-        }
+      GetManagedKeysResponse keyStateResponse = null;
+      ManagedKeyResponse.Builder builder = ManagedKeyResponse.newBuilder();
+      try {
+        initManagedKeyResponseBuilder(controller, request, builder);
+        List<ManagedKeyData> managedKeyStates = master.getKeymetaAdmin()
+          .getManagedKeys(request.getKeyCust().toByteArray(), 
request.getKeyNamespace());
+        keyStateResponse = generateKeyStateResponse(managedKeyStates, builder);
+      } catch (IOException | KeyException e) {
+        CoprocessorRpcUtils.setControllerException(controller, new 
DoNotRetryIOException(e));
+      }
+      if (keyStateResponse == null) {
+        keyStateResponse = GetManagedKeysResponse.getDefaultInstance();
+      }
+      done.run(keyStateResponse);
+    }
+
+    /**
+     * Rotates the system key (STK) by checking for a new key and propagating 
it to all region
+     * servers.
+     * @param controller The RPC controller.
+     * @param request    The request (empty).
+     * @param done       The callback to be invoked with the response.
+     */
+    @Override
+    public void rotateSTK(RpcController controller, EmptyMsg request,
+      RpcCallback<RotateSTKResponse> done) {
+      boolean rotated;
+      try {
+        rotated = master.getKeymetaAdmin().rotateSTK();
+      } catch (IOException e) {
+        CoprocessorRpcUtils.setControllerException(controller, new 
DoNotRetryIOException(e));
+        rotated = false;
       }
+      done.run(RotateSTKResponse.newBuilder().setRotated(rotated).build());
     }
   }
 
   @InterfaceAudience.Private
-  public static ManagedKeysResponse.Builder getResponseBuilder(RpcController 
controller,
-    ManagedKeysRequest request) {
-    ManagedKeysResponse.Builder builder = ManagedKeysResponse.newBuilder();
-    byte[] key_cust = convertToKeyCustBytes(controller, request, builder);
-    if (key_cust != null) {
-      builder.setKeyCustBytes(ByteString.copyFrom(key_cust));
+  public static ManagedKeyResponse.Builder 
initManagedKeyResponseBuilder(RpcController controller,
+    ManagedKeyRequest request, ManagedKeyResponse.Builder builder) throws 
IOException {
+    builder.setKeyCust(request.getKeyCust());
+    builder.setKeyNamespace(request.getKeyNamespace());
+    if (request.getKeyCust().isEmpty()) {
+      throw new IOException("key_cust must not be empty");
     }
     return builder;
   }
@@ -148,29 +175,19 @@ public class KeymetaServiceEndpoint implements 
MasterCoprocessor {
   // Assumes that all ManagedKeyData objects belong to the same custodian and 
namespace.
   @InterfaceAudience.Private
   public static GetManagedKeysResponse generateKeyStateResponse(
-    List<ManagedKeyData> managedKeyStates, ManagedKeysResponse.Builder 
builder) {
+    List<ManagedKeyData> managedKeyStates, ManagedKeyResponse.Builder builder) 
{
     GetManagedKeysResponse.Builder responseBuilder = 
GetManagedKeysResponse.newBuilder();
     for (ManagedKeyData keyData : managedKeyStates) {
-      builder
-        
.setKeyState(ManagedKeysProtos.ManagedKeyState.forNumber(keyData.getKeyState().getVal()))
-        
.setKeyMetadata(keyData.getKeyMetadata()).setRefreshTimestamp(keyData.getRefreshTimestamp())
-        .setKeyNamespace(keyData.getKeyNamespace());
-      responseBuilder.addState(builder.build());
+      responseBuilder.addState(generateKeyStateResponse(keyData, builder));
     }
     return responseBuilder.build();
   }
 
-  @InterfaceAudience.Private
-  public static byte[] convertToKeyCustBytes(RpcController controller, 
ManagedKeysRequest request,
-    ManagedKeysResponse.Builder builder) {
-    byte[] key_cust = null;
-    try {
-      key_cust = Base64.getDecoder().decode(request.getKeyCust());
-    } catch (IllegalArgumentException e) {
-      builder.setKeyState(ManagedKeysProtos.ManagedKeyState.KEY_FAILED);
-      CoprocessorRpcUtils.setControllerException(controller, new IOException(
-        "Failed to decode specified prefix as Base64 string: " + 
request.getKeyCust(), e));
-    }
-    return key_cust;
+  private static ManagedKeyResponse generateKeyStateResponse(ManagedKeyData 
keyData,
+    ManagedKeyResponse.Builder builder) {
+    
builder.setKeyState(ManagedKeysProtos.ManagedKeyState.forNumber(keyData.getKeyState().getVal()))
+      
.setKeyMetadata(keyData.getKeyMetadata()).setRefreshTimestamp(keyData.getRefreshTimestamp())
+      .setKeyNamespace(keyData.getKeyNamespace());
+    return builder.build();
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index d369a19d969..7037656419b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -119,6 +119,7 @@ import org.apache.hadoop.hbase.executor.ExecutorType;
 import org.apache.hadoop.hbase.favored.FavoredNodesManager;
 import org.apache.hadoop.hbase.http.HttpServer;
 import org.apache.hadoop.hbase.http.InfoServer;
+import org.apache.hadoop.hbase.io.crypto.ManagedKeyData;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
@@ -1640,6 +1641,17 @@ public class HMaster extends 
HBaseServerBase<MasterRpcServices> implements Maste
     return this.walManager;
   }
 
+  @Override
+  public boolean rotateSystemKeyIfChanged() throws IOException {
+    ManagedKeyData newKey = this.systemKeyManager.rotateSystemKeyIfChanged();
+    if (newKey != null) {
+      this.systemKeyCache = null;
+      buildSystemKeyCache();
+      return true;
+    }
+    return false;
+  }
+
   @Override
   public SplitWALManager getSplitWALManager() {
     return splitWALManager;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index e9e0f970ef8..c63a1e7e8ec 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -193,6 +193,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.EmptyMsg;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
@@ -3621,6 +3622,12 @@ public class MasterRpcServices extends 
HBaseRpcServicesBase<HMaster>
     throw new ServiceException(new DoNotRetryIOException("Unsupported method 
on master"));
   }
 
+  @Override
+  public EmptyMsg refreshSystemKeyCache(RpcController controller, EmptyMsg 
request)
+    throws ServiceException {
+    throw new ServiceException(new DoNotRetryIOException("Unsupported method 
on master"));
+  }
+
   @Override
   public GetLiveRegionServersResponse getLiveRegionServers(RpcController 
controller,
     GetLiveRegionServersRequest request) throws ServiceException {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index dee9b48f9ea..745b962860b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -87,6 +87,9 @@ public interface MasterServices extends Server, 
KeyManagementService {
   /** Returns Master's WALs {@link MasterWalManager} utility class. */
   MasterWalManager getMasterWalManager();
 
+  /** Rotates the system key if changed, returns true if a new key was 
detected and rotated */
+  boolean rotateSystemKeyIfChanged() throws IOException;
+
   /** Returns Master's {@link ServerManager} instance. */
   ServerManager getServerManager();
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index fdfea375e09..ed2f81a947d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -234,6 +234,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanReques
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.EmptyMsg;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
@@ -4058,6 +4059,29 @@ public class RSRpcServices extends 
HBaseRpcServicesBase<HRegionServer>
     return responseBuilder.addAllCachedFiles(fullyCachedFiles).build();
   }
 
+  /**
+   * Refreshes the system key cache on the region server by rebuilding it with 
the latest keys. This
+   * is called by the master when a system key rotation has occurred.
+   * @param controller the RPC controller
+   * @param request    the request
+   * @return empty response
+   */
+  @Override
+  @QosPriority(priority = HConstants.ADMIN_QOS)
+  public EmptyMsg refreshSystemKeyCache(final RpcController controller, final 
EmptyMsg request)
+    throws ServiceException {
+    try {
+      checkOpen();
+      requestCount.increment();
+      LOG.info("Received RefreshSystemKeyCache request, rebuilding system key 
cache");
+      server.rebuildSystemKeyCache();
+      return EmptyMsg.getDefaultInstance();
+    } catch (IOException ie) {
+      LOG.error("Failed to rebuild system key cache", ie);
+      throw new ServiceException(ie);
+    }
+  }
+
   RegionScannerContext checkQuotaAndGetRegionScannerContext(ScanRequest 
request,
     ScanResponse.Builder builder) throws IOException {
     if (request.hasScannerId()) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/SecurityUtil.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/SecurityUtil.java
index 3fe2937e4d6..b104f468760 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/SecurityUtil.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/SecurityUtil.java
@@ -82,8 +82,10 @@ public class SecurityUtil {
     boolean isKeyManagementEnabled = isKeyManagementEnabled(conf);
     String cipherName = family.getEncryptionType();
     String keyNamespace = null; // Will be set by fallback logic
-    LOG.debug("Creating encryption context for table: {} and column family: 
{}",
-      tableDescriptor.getTableName().getNameAsString(), 
family.getNameAsString());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Creating encryption context for table: {} and column family: 
{}",
+        tableDescriptor.getTableName().getNameAsString(), 
family.getNameAsString());
+    }
     if (cipherName != null) {
       if (!Encryption.isEncryptionEnabled(conf)) {
         throw new IllegalStateException("Encryption for family '" + 
family.getNameAsString()
@@ -108,10 +110,8 @@ public class SecurityUtil {
             // Scenario 1b: If key management is disabled, unwrap the key 
using master key.
             key = EncryptionUtil.unwrapKey(conf, familyKeyBytes);
           }
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Scenario 1: Use family key for namespace {} cipher: {} "
-              + "key management enabled: {}", keyNamespace, cipherName, 
isKeyManagementEnabled);
-          }
+          LOG.debug("Scenario 1: Use family key for namespace {} cipher: {} "
+            + "key management enabled: {}", keyNamespace, cipherName, 
isKeyManagementEnabled);
         } catch (KeyException e) {
           throw new IOException(e);
         }
@@ -133,8 +133,10 @@ public class SecurityUtil {
           for (String candidate : candidateNamespaces) {
             if (candidate != null) {
               // Log information on the table and column family we are looking 
for the active key in
-              LOG.debug("Looking for active key for table: {} and column 
family: {}",
-                tableDescriptor.getTableName().getNameAsString(), 
family.getNameAsString());
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Looking for active key for table: {} and column 
family: {}",
+                  tableDescriptor.getTableName().getNameAsString(), 
family.getNameAsString());
+              }
               activeKeyData = managedKeyDataCache
                 .getActiveEntry(ManagedKeyData.KEY_GLOBAL_CUSTODIAN_BYTES, 
candidate);
               if (activeKeyData != null) {
@@ -216,9 +218,7 @@ public class SecurityUtil {
     ManagedKeyData kekKeyData = null;
     byte[] keyBytes = trailer.getEncryptionKey();
     Encryption.Context cryptoContext = Encryption.Context.NONE;
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Creating encryption context for path: {}", path);
-    }
+    LOG.debug("Creating encryption context for path: {}", path);
     // Check for any key material available
     if (keyBytes != null) {
       cryptoContext = Encryption.newContext(conf);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/keymeta/TestKeymetaEndpoint.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/keymeta/TestKeymetaEndpoint.java
index 7c884bdd27e..56c92c873c0 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/keymeta/TestKeymetaEndpoint.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/keymeta/TestKeymetaEndpoint.java
@@ -18,14 +18,13 @@
 package org.apache.hadoop.hbase.keymeta;
 
 import static org.apache.hadoop.hbase.io.crypto.ManagedKeyState.ACTIVE;
-import static 
org.apache.hadoop.hbase.protobuf.generated.ManagedKeysProtos.ManagedKeyState.KEY_ACTIVE;
-import static 
org.apache.hadoop.hbase.protobuf.generated.ManagedKeysProtos.ManagedKeyState.KEY_FAILED;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.ArgumentMatchers.contains;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -37,7 +36,6 @@ import java.io.IOException;
 import java.security.KeyException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Base64;
 import java.util.List;
 import javax.crypto.spec.SecretKeySpec;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -46,9 +44,10 @@ import org.apache.hadoop.hbase.coprocessor.HasMasterServices;
 import org.apache.hadoop.hbase.io.crypto.ManagedKeyData;
 import 
org.apache.hadoop.hbase.keymeta.KeymetaServiceEndpoint.KeymetaAdminServiceImpl;
 import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.protobuf.generated.ManagedKeysProtos;
 import 
org.apache.hadoop.hbase.protobuf.generated.ManagedKeysProtos.GetManagedKeysResponse;
-import 
org.apache.hadoop.hbase.protobuf.generated.ManagedKeysProtos.ManagedKeysRequest;
-import 
org.apache.hadoop.hbase.protobuf.generated.ManagedKeysProtos.ManagedKeysResponse;
+import 
org.apache.hadoop.hbase.protobuf.generated.ManagedKeysProtos.ManagedKeyRequest;
+import 
org.apache.hadoop.hbase.protobuf.generated.ManagedKeysProtos.ManagedKeyResponse;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -80,17 +79,20 @@ public class TestKeymetaEndpoint {
   @Mock
   private MasterServices master;
   @Mock
-  private RpcCallback done;
+  private RpcCallback<ManagedKeyResponse> enableKeyManagementDone;
   @Mock
-  private KeymetaAdmin keymetaAdmin;
+  private RpcCallback<GetManagedKeysResponse> getManagedKeysDone;
 
   KeymetaServiceEndpoint keymetaServiceEndpoint;
-  private ManagedKeysResponse.Builder responseBuilder;
-  private ManagedKeysRequest.Builder requestBuilder;
+  private ManagedKeyResponse.Builder responseBuilder;
+  private ManagedKeyRequest.Builder requestBuilder;
   private KeymetaAdminServiceImpl keyMetaAdminService;
   private ManagedKeyData keyData1;
   private ManagedKeyData keyData2;
 
+  @Mock
+  private KeymetaAdmin keymetaAdmin;
+
   @Before
   public void setUp() throws Exception {
     MockitoAnnotations.initMocks(this);
@@ -101,9 +103,10 @@ public class TestKeymetaEndpoint {
     keymetaServiceEndpoint.start(env);
     keyMetaAdminService =
       (KeymetaAdminServiceImpl) 
keymetaServiceEndpoint.getServices().iterator().next();
-    responseBuilder = ManagedKeysResponse.newBuilder().setKeyState(KEY_ACTIVE);
+    responseBuilder =
+      
ManagedKeyResponse.newBuilder().setKeyState(ManagedKeysProtos.ManagedKeyState.KEY_ACTIVE);
     requestBuilder =
-      
ManagedKeysRequest.newBuilder().setKeyNamespace(ManagedKeyData.KEY_SPACE_GLOBAL);
+      
ManagedKeyRequest.newBuilder().setKeyNamespace(ManagedKeyData.KEY_SPACE_GLOBAL);
     keyData1 = new ManagedKeyData(KEY_CUST.getBytes(), KEY_NAMESPACE,
       new SecretKeySpec("key1".getBytes(), "AES"), ACTIVE, KEY_METADATA1);
     keyData2 = new ManagedKeyData(KEY_CUST.getBytes(), KEY_NAMESPACE,
@@ -112,75 +115,33 @@ public class TestKeymetaEndpoint {
   }
 
   @Test
-  public void testConvertToKeyCustBytesValid() {
-    // Arrange
-    String validBase64 = 
Base64.getEncoder().encodeToString("testKey".getBytes());
-    ManagedKeysRequest request = 
requestBuilder.setKeyCust(validBase64).build();
+  public void testCreateResponseBuilderValid() throws IOException {
+    byte[] cust = "testKey".getBytes();
+    ManagedKeyRequest request = 
requestBuilder.setKeyCust(ByteString.copyFrom(cust)).build();
 
-    // Act
-    byte[] result =
-      KeymetaServiceEndpoint.convertToKeyCustBytes(controller, request, 
responseBuilder);
+    ManagedKeyResponse.Builder result = ManagedKeyResponse.newBuilder();
+    KeymetaServiceEndpoint.initManagedKeyResponseBuilder(controller, request, 
result);
 
-    // Assert
     assertNotNull(result);
-    assertArrayEquals("testKey".getBytes(), result);
-    assertEquals(KEY_ACTIVE, responseBuilder.getKeyState());
+    assertArrayEquals(cust, result.getKeyCust().toByteArray());
     verify(controller, never()).setFailed(anyString());
   }
 
   @Test
-  public void testConvertToKeyCustBytesInvalid() {
-    // Arrange
-    String invalidBase64 = "invalid!Base64@String";
-    ManagedKeysRequest request = 
requestBuilder.setKeyCust(invalidBase64).build();
+  public void testCreateResponseBuilderEmptyCust() throws IOException {
+    ManagedKeyRequest request = 
requestBuilder.setKeyCust(ByteString.EMPTY).build();
 
-    // Act
-    byte[] result =
-      KeymetaServiceEndpoint.convertToKeyCustBytes(controller, request, 
responseBuilder);
+    IOException exception = assertThrows(IOException.class, () -> 
KeymetaServiceEndpoint
+      .initManagedKeyResponseBuilder(controller, request, 
ManagedKeyResponse.newBuilder()));
 
-    // Assert
-    assertNull(result);
-    assertEquals(KEY_FAILED, responseBuilder.getKeyState());
-    verify(controller).setFailed(anyString());
-  }
-
-  @Test
-  public void testGetResponseBuilder() {
-    // Arrange
-    String keyCust = Base64.getEncoder().encodeToString("testKey".getBytes());
-    ManagedKeysRequest request = requestBuilder.setKeyCust(keyCust).build();
-
-    // Act
-    ManagedKeysResponse.Builder result =
-      KeymetaServiceEndpoint.getResponseBuilder(controller, request);
-
-    // Assert
-    assertNotNull(result);
-    assertArrayEquals("testKey".getBytes(), 
result.getKeyCustBytes().toByteArray());
-    verify(controller, never()).setFailed(anyString());
-  }
-
-  @Test
-  public void testGetResponseBuilderWithInvalidBase64() {
-    // Arrange
-    String keyCust = "invalidBase64!";
-    ManagedKeysRequest request = requestBuilder.setKeyCust(keyCust).build();
-
-    // Act
-    ManagedKeysResponse.Builder result =
-      KeymetaServiceEndpoint.getResponseBuilder(controller, request);
-
-    // Assert
-    assertNotNull(result);
-    assertEquals(KEY_FAILED, result.getKeyState());
-    verify(controller).setFailed(contains("Failed to decode specified prefix 
as Base64 string"));
+    assertEquals("key_cust must not be empty", exception.getMessage());
   }
 
   @Test
   public void testGenerateKeyStateResponse() throws Exception {
     // Arrange
-    ManagedKeysResponse response =
-      
responseBuilder.setKeyCustBytes(ByteString.copyFrom(keyData1.getKeyCustodian()))
+    ManagedKeyResponse response =
+      
responseBuilder.setKeyCust(ByteString.copyFrom(keyData1.getKeyCustodian()))
         .setKeyNamespace(keyData1.getKeyNamespace()).build();
     List<ManagedKeyData> managedKeyStates = Arrays.asList(keyData1, keyData2);
 
@@ -192,9 +153,10 @@ public class TestKeymetaEndpoint {
     assertNotNull(response);
     assertNotNull(result.getStateList());
     assertEquals(2, result.getStateList().size());
-    assertEquals(KEY_ACTIVE, result.getStateList().get(0).getKeyState());
+    assertEquals(ManagedKeysProtos.ManagedKeyState.KEY_ACTIVE,
+      result.getStateList().get(0).getKeyState());
     assertEquals(0, Bytes.compareTo(keyData1.getKeyCustodian(),
-      result.getStateList().get(0).getKeyCustBytes().toByteArray()));
+      result.getStateList().get(0).getKeyCust().toByteArray()));
     assertEquals(keyData1.getKeyNamespace(), 
result.getStateList().get(0).getKeyNamespace());
     verify(controller, never()).setFailed(anyString());
   }
@@ -202,8 +164,8 @@ public class TestKeymetaEndpoint {
   @Test
   public void testGenerateKeyStateResponse_Empty() throws Exception {
     // Arrange
-    ManagedKeysResponse response =
-      
responseBuilder.setKeyCustBytes(ByteString.copyFrom(keyData1.getKeyCustodian()))
+    ManagedKeyResponse response =
+      
responseBuilder.setKeyCust(ByteString.copyFrom(keyData1.getKeyCustodian()))
         .setKeyNamespace(keyData1.getKeyNamespace()).build();
     List<ManagedKeyData> managedKeyStates = new ArrayList<>();
 
@@ -221,20 +183,22 @@ public class TestKeymetaEndpoint {
   @Test
   public void testGenerateKeyStatResponse_Success() throws Exception {
     doTestServiceCallForSuccess((controller, request, done) -> 
keyMetaAdminService
-      .enableKeyManagement(controller, request, done));
+      .enableKeyManagement(controller, request, done), 
enableKeyManagementDone);
   }
 
   @Test
   public void testGetManagedKeys_Success() throws Exception {
     doTestServiceCallForSuccess(
-      (controller, request, done) -> 
keyMetaAdminService.getManagedKeys(controller, request, done));
+      (controller, request, done) -> 
keyMetaAdminService.getManagedKeys(controller, request, done),
+      getManagedKeysDone);
   }
 
-  private void doTestServiceCallForSuccess(ServiceCall svc) throws Exception {
+  private <T> void doTestServiceCallForSuccess(ServiceCall<T> svc, 
RpcCallback<T> done)
+    throws Exception {
     // Arrange
-    ManagedKeysRequest request = requestBuilder.setKeyCust(KEY_CUST).build();
-    List<ManagedKeyData> managedKeyStates = Arrays.asList(keyData1);
-    when(keymetaAdmin.enableKeyManagement(any(), 
any())).thenReturn(managedKeyStates);
+    ManagedKeyRequest request =
+      
requestBuilder.setKeyCust(ByteString.copyFrom(KEY_CUST.getBytes())).build();
+    when(keymetaAdmin.enableKeyManagement(any(), any())).thenReturn(keyData1);
 
     // Act
     svc.call(controller, request, done);
@@ -244,39 +208,41 @@ public class TestKeymetaEndpoint {
     verify(controller, never()).setFailed(anyString());
   }
 
-  private interface ServiceCall {
-    void call(RpcController controller, ManagedKeysRequest request,
-      RpcCallback<GetManagedKeysResponse> done) throws Exception;
+  private interface ServiceCall<T> {
+    void call(RpcController controller, ManagedKeyRequest request, 
RpcCallback<T> done)
+      throws Exception;
   }
 
   @Test
   public void testGenerateKeyStateResponse_InvalidCust() throws Exception {
     // Arrange
-    String invalidBase64 = "invalid!Base64@String";
-    ManagedKeysRequest request = 
requestBuilder.setKeyCust(invalidBase64).build();
+    ManagedKeyRequest request = 
requestBuilder.setKeyCust(ByteString.EMPTY).build();
 
     // Act
-    keyMetaAdminService.enableKeyManagement(controller, request, done);
+    keyMetaAdminService.enableKeyManagement(controller, request, 
enableKeyManagementDone);
 
     // Assert
-    verify(controller).setFailed(contains("IOException"));
+    verify(controller).setFailed(contains("key_cust must not be empty"));
     verify(keymetaAdmin, never()).enableKeyManagement(any(), any());
-    verify(done, never()).run(any());
+    verify(enableKeyManagementDone).run(
+      argThat(response -> response.getKeyState() == 
ManagedKeysProtos.ManagedKeyState.KEY_FAILED));
   }
 
   @Test
   public void testGenerateKeyStateResponse_IOException() throws Exception {
     // Arrange
     when(keymetaAdmin.enableKeyManagement(any(), 
any())).thenThrow(IOException.class);
-    ManagedKeysRequest request = requestBuilder.setKeyCust(KEY_CUST).build();
+    ManagedKeyRequest request =
+      
requestBuilder.setKeyCust(ByteString.copyFrom(KEY_CUST.getBytes())).build();
 
     // Act
-    keyMetaAdminService.enableKeyManagement(controller, request, done);
+    keyMetaAdminService.enableKeyManagement(controller, request, 
enableKeyManagementDone);
 
     // Assert
     verify(controller).setFailed(contains("IOException"));
     verify(keymetaAdmin).enableKeyManagement(any(), any());
-    verify(done, never()).run(any());
+    verify(enableKeyManagementDone).run(
+      argThat(response -> response.getKeyState() == 
ManagedKeysProtos.ManagedKeyState.KEY_FAILED));
   }
 
   @Test
@@ -292,29 +258,27 @@ public class TestKeymetaEndpoint {
   private void doTestGetManagedKeysError(Class<? extends Exception> exType) 
throws Exception {
     // Arrange
     when(keymetaAdmin.getManagedKeys(any(), any())).thenThrow(exType);
-    ManagedKeysRequest request = requestBuilder.setKeyCust(KEY_CUST).build();
+    ManagedKeyRequest request =
+      
requestBuilder.setKeyCust(ByteString.copyFrom(KEY_CUST.getBytes())).build();
 
     // Act
-    keyMetaAdminService.getManagedKeys(controller, request, done);
+    keyMetaAdminService.getManagedKeys(controller, request, 
getManagedKeysDone);
 
     // Assert
     verify(controller).setFailed(contains(exType.getSimpleName()));
     verify(keymetaAdmin).getManagedKeys(any(), any());
-    verify(done, never()).run(any());
+    
verify(getManagedKeysDone).run(GetManagedKeysResponse.getDefaultInstance());
   }
 
   @Test
   public void testGetManagedKeys_InvalidCust() throws Exception {
     // Arrange
-    String invalidBase64 = "invalid!Base64@String";
-    ManagedKeysRequest request = 
requestBuilder.setKeyCust(invalidBase64).build();
+    ManagedKeyRequest request = 
requestBuilder.setKeyCust(ByteString.EMPTY).build();
 
-    // Act
-    keyMetaAdminService.getManagedKeys(controller, request, done);
+    keyMetaAdminService.getManagedKeys(controller, request, 
getManagedKeysDone);
 
-    // Assert
-    verify(controller).setFailed(contains("IOException"));
+    verify(controller).setFailed(contains("key_cust must not be empty"));
     verify(keymetaAdmin, never()).getManagedKeys(any(), any());
-    verify(done, never()).run(any());
+    verify(getManagedKeysDone).run(argThat(response -> 
response.getStateList().isEmpty()));
   }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/keymeta/TestManagedKeymeta.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/keymeta/TestManagedKeymeta.java
index 63f05e7ee5e..75beb9f8370 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/keymeta/TestManagedKeymeta.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/keymeta/TestManagedKeymeta.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hbase.keymeta;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
@@ -29,14 +31,16 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.security.KeyException;
 import java.util.List;
+import java.util.function.BiFunction;
+import java.util.function.Function;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.io.crypto.Encryption;
 import org.apache.hadoop.hbase.io.crypto.ManagedKeyData;
-import org.apache.hadoop.hbase.io.crypto.ManagedKeyProvider;
 import org.apache.hadoop.hbase.io.crypto.ManagedKeyState;
 import org.apache.hadoop.hbase.io.crypto.MockManagedKeyProvider;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.protobuf.generated.ManagedKeysProtos;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.junit.ClassRule;
@@ -45,6 +49,9 @@ import org.junit.experimental.categories.Category;
 
 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
 
+/**
+ * Tests the admin API via both RPC and local calls.
+ */
 @Category({ MasterTests.class, MediumTests.class })
 public class TestManagedKeymeta extends ManagedKeyTestBase {
 
@@ -70,76 +77,183 @@ public class TestManagedKeymeta extends ManagedKeyTestBase 
{
     MockManagedKeyProvider managedKeyProvider =
       (MockManagedKeyProvider) 
Encryption.getManagedKeyProvider(master.getConfiguration());
     String cust = "cust1";
-    String encodedCust = ManagedKeyProvider.encodeToStr(cust.getBytes());
-    List<ManagedKeyData> managedKeyStates =
-      adminClient.enableKeyManagement(encodedCust, 
ManagedKeyData.KEY_SPACE_GLOBAL);
-    assertKeyDataListSingleKey(managedKeyStates, ManagedKeyState.ACTIVE);
+    byte[] custBytes = cust.getBytes();
+    ManagedKeyData managedKey =
+      adminClient.enableKeyManagement(custBytes, 
ManagedKeyData.KEY_SPACE_GLOBAL);
+    assertKeyDataSingleKey(managedKey, ManagedKeyState.ACTIVE);
 
     List<ManagedKeyData> managedKeys =
-      adminClient.getManagedKeys(encodedCust, ManagedKeyData.KEY_SPACE_GLOBAL);
-    assertEquals(1, managedKeys.size());
+      adminClient.getManagedKeys(custBytes, ManagedKeyData.KEY_SPACE_GLOBAL);
     assertEquals(managedKeyProvider.getLastGeneratedKeyData(cust, 
ManagedKeyData.KEY_SPACE_GLOBAL)
       .cloneWithoutKey(), managedKeys.get(0).cloneWithoutKey());
 
     String nonExistentCust = "nonExistentCust";
+    byte[] nonExistentBytes = nonExistentCust.getBytes();
     managedKeyProvider.setMockedKeyState(nonExistentCust, 
ManagedKeyState.FAILED);
-    List<ManagedKeyData> keyDataList1 = adminClient.enableKeyManagement(
-      ManagedKeyProvider.encodeToStr(nonExistentCust.getBytes()), 
ManagedKeyData.KEY_SPACE_GLOBAL);
-    assertKeyDataListSingleKey(keyDataList1, ManagedKeyState.FAILED);
+    ManagedKeyData managedKey1 =
+      adminClient.enableKeyManagement(nonExistentBytes, 
ManagedKeyData.KEY_SPACE_GLOBAL);
+    assertKeyDataSingleKey(managedKey1, ManagedKeyState.FAILED);
 
     String disabledCust = "disabledCust";
+    byte[] disabledBytes = disabledCust.getBytes();
     managedKeyProvider.setMockedKeyState(disabledCust, 
ManagedKeyState.DISABLED);
-    List<ManagedKeyData> keyDataList2 = adminClient.enableKeyManagement(
-      ManagedKeyProvider.encodeToStr(disabledCust.getBytes()), 
ManagedKeyData.KEY_SPACE_GLOBAL);
-    assertKeyDataListSingleKey(keyDataList2, ManagedKeyState.DISABLED);
+    ManagedKeyData managedKey2 =
+      adminClient.enableKeyManagement(disabledBytes, 
ManagedKeyData.KEY_SPACE_GLOBAL);
+    assertKeyDataSingleKey(managedKey2, ManagedKeyState.DISABLED);
   }
 
-  private static void assertKeyDataListSingleKey(List<ManagedKeyData> 
managedKeyStates,
+  private static void assertKeyDataSingleKey(ManagedKeyData managedKeyState,
     ManagedKeyState keyState) {
-    assertNotNull(managedKeyStates);
-    assertEquals(1, managedKeyStates.size());
-    assertEquals(keyState, managedKeyStates.get(0).getKeyState());
+    assertNotNull(managedKeyState);
+    assertEquals(keyState, managedKeyState.getKeyState());
   }
 
   @Test
-  public void testEnableKeyManagementWithServiceException() throws Exception {
-    ManagedKeysProtos.ManagedKeysService.BlockingInterface mockStub =
-      mock(ManagedKeysProtos.ManagedKeysService.BlockingInterface.class);
-
-    ServiceException networkError = new ServiceException("Network error");
-    networkError.initCause(new IOException("Network error"));
-    when(mockStub.enableKeyManagement(any(), any())).thenThrow(networkError);
+  public void testEnableKeyManagementWithExceptionOnGetManagedKey() throws 
Exception {
+    MockManagedKeyProvider managedKeyProvider =
+      (MockManagedKeyProvider) 
Encryption.getManagedKeyProvider(TEST_UTIL.getConfiguration());
+    managedKeyProvider.setShouldThrowExceptionOnGetManagedKey(true);
+    KeymetaAdmin adminClient = new 
KeymetaAdminClient(TEST_UTIL.getConnection());
+    IOException exception = assertThrows(IOException.class,
+      () -> adminClient.enableKeyManagement(new byte[0], "namespace"));
+    assertTrue(exception.getMessage().contains("key_cust must not be empty"));
+  }
 
-    KeymetaAdminClient client = new 
KeymetaAdminClient(TEST_UTIL.getConnection());
-    // Use reflection to set the stub
-    Field stubField = KeymetaAdminClient.class.getDeclaredField("stub");
-    stubField.setAccessible(true);
-    stubField.set(client, mockStub);
+  @Test
+  public void testEnableKeyManagementWithClientSideServiceException() throws 
Exception {
+    doTestWithClientSideServiceException((mockStub, networkError) -> {
+      try {
+        when(mockStub.enableKeyManagement(any(), 
any())).thenThrow(networkError);
+      } catch (ServiceException e) {
+        // We are just setting up the mock, so no exception is expected here.
+        throw new RuntimeException("Unexpected ServiceException", e);
+      }
+      return null;
+    }, (client) -> {
+      try {
+        client.enableKeyManagement(new byte[0], "namespace");
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      return null;
+    });
+  }
 
-    IOException exception = assertThrows(IOException.class, () -> {
-      client.enableKeyManagement("cust", "namespace");
+  @Test
+  public void testGetManagedKeysWithClientSideServiceException() throws 
Exception {
+    // Similar test for getManagedKeys method
+    doTestWithClientSideServiceException((mockStub, networkError) -> {
+      try {
+        when(mockStub.getManagedKeys(any(), any())).thenThrow(networkError);
+      } catch (ServiceException e) {
+        // We are just setting up the mock, so no exception is expected here.
+        throw new RuntimeException("Unexpected ServiceException", e);
+      }
+      return null;
+    }, (client) -> {
+      try {
+        client.getManagedKeys(new byte[0], "namespace");
+      } catch (IOException | KeyException e) {
+        throw new RuntimeException(e);
+      }
+      return null;
     });
+  }
 
-    assertTrue(exception.getMessage().contains("Network error"));
+  @Test
+  public void testRotateSTKLocal() throws Exception {
+    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
+    KeymetaAdmin keymetaAdmin = master.getKeymetaAdmin();
+    doTestRotateSTK(keymetaAdmin);
   }
 
   @Test
-  public void testGetManagedKeysWithServiceException() throws Exception {
-    // Similar test for getManagedKeys method
+  public void testRotateSTKOverRPC() throws Exception {
+    KeymetaAdmin adminClient = new 
KeymetaAdminClient(TEST_UTIL.getConnection());
+    doTestRotateSTK(adminClient);
+  }
+
+  private void doTestRotateSTK(KeymetaAdmin adminClient) throws IOException {
+    // Call rotateSTK - since no actual system key change has occurred,
+    // this should return false (no rotation performed)
+    boolean result = adminClient.rotateSTK();
+    assertFalse("rotateSTK should return false when no key change is 
detected", result);
+
+    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
+    ManagedKeyData currentSystemKey = 
master.getSystemKeyCache().getLatestSystemKey();
+
+    MockManagedKeyProvider managedKeyProvider =
+      (MockManagedKeyProvider) 
Encryption.getManagedKeyProvider(TEST_UTIL.getConfiguration());
+    // Once we enable multikeyGenMode on MockManagedKeyProvider, every call 
should return a new key
+    // which should trigger a rotation.
+    managedKeyProvider.setMultikeyGenMode(true);
+    result = adminClient.rotateSTK();
+    assertTrue("rotateSTK should return true when a new key is detected", 
result);
+
+    ManagedKeyData newSystemKey = 
master.getSystemKeyCache().getLatestSystemKey();
+    assertNotEquals("newSystemKey should be different from currentSystemKey", 
currentSystemKey,
+      newSystemKey);
+
+    HRegionServer regionServer = 
TEST_UTIL.getHBaseCluster().getRegionServer(0);
+    assertEquals("regionServer should have the same new system key", 
newSystemKey,
+      regionServer.getSystemKeyCache().getLatestSystemKey());
+
+  }
+
+  @Test
+  public void testRotateSTKWithExceptionOnGetSystemKey() throws Exception {
+    MockManagedKeyProvider managedKeyProvider =
+      (MockManagedKeyProvider) 
Encryption.getManagedKeyProvider(TEST_UTIL.getConfiguration());
+    managedKeyProvider.setShouldThrowExceptionOnGetSystemKey(true);
+    KeymetaAdmin adminClient = new 
KeymetaAdminClient(TEST_UTIL.getConnection());
+    IOException exception = assertThrows(IOException.class, () -> 
adminClient.rotateSTK());
+    assertTrue(exception.getMessage().contains("Test exception on 
getSystemKey"));
+  }
+
+  @Test
+  public void testRotateSTKWithClientSideServiceException() throws Exception {
+    doTestWithClientSideServiceException((mockStub, networkError) -> {
+      try {
+        when(mockStub.rotateSTK(any(), any())).thenThrow(networkError);
+      } catch (ServiceException e) {
+        // We are just setting up the mock, so no exception is expected here.
+        throw new RuntimeException("Unexpected ServiceException", e);
+      }
+      return null;
+    }, (client) -> {
+      try {
+        client.rotateSTK();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      return null;
+    });
+  }
+
+  private void
+    doTestWithClientSideServiceException(BiFunction<
+      ManagedKeysProtos.ManagedKeysService.BlockingInterface, 
ServiceException, Void> setupFunction,
+      Function<KeymetaAdminClient, Void> testFunction) throws Exception {
     ManagedKeysProtos.ManagedKeysService.BlockingInterface mockStub =
       mock(ManagedKeysProtos.ManagedKeysService.BlockingInterface.class);
 
     ServiceException networkError = new ServiceException("Network error");
     networkError.initCause(new IOException("Network error"));
-    when(mockStub.getManagedKeys(any(), any())).thenThrow(networkError);
 
     KeymetaAdminClient client = new 
KeymetaAdminClient(TEST_UTIL.getConnection());
+    // Use reflection to set the stub
     Field stubField = KeymetaAdminClient.class.getDeclaredField("stub");
     stubField.setAccessible(true);
     stubField.set(client, mockStub);
 
+    setupFunction.apply(mockStub, networkError);
+
     IOException exception = assertThrows(IOException.class, () -> {
-      client.getManagedKeys("cust", "namespace");
+      try {
+        testFunction.apply(client);
+      } catch (RuntimeException e) {
+        throw e.getCause();
+      }
     });
 
     assertTrue(exception.getMessage().contains("Network error"));
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index 5b522dc9107..b04380ae450 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -155,6 +155,11 @@ public class MockNoopMasterServices implements 
MasterServices {
     return null;
   }
 
+  @Override
+  public boolean rotateSystemKeyIfChanged() {
+    return false;
+  }
+
   @Override
   public MasterCoprocessorHost getMasterCoprocessorHost() {
     return null;
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 81977c24b29..402e8697fe9 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -144,6 +144,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBul
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.EmptyMsg;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
 
@@ -711,6 +712,12 @@ class MockRegionServer implements 
AdminProtos.AdminService.BlockingInterface,
     return null;
   }
 
+  @Override
+  public EmptyMsg refreshSystemKeyCache(RpcController controller, EmptyMsg 
request)
+    throws ServiceException {
+    return null;
+  }
+
   @Override
   public Connection createConnection(Configuration conf) throws IOException {
     return null;
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestKeymetaAdminImpl.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestKeymetaAdminImpl.java
index a2cb14223e1..cb5c8dc1174 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestKeymetaAdminImpl.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestKeymetaAdminImpl.java
@@ -23,11 +23,15 @@ import static 
org.apache.hadoop.hbase.io.crypto.ManagedKeyState.DISABLED;
 import static org.apache.hadoop.hbase.io.crypto.ManagedKeyState.FAILED;
 import static org.apache.hadoop.hbase.io.crypto.ManagedKeyState.INACTIVE;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -37,17 +41,22 @@ import java.security.KeyException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.client.AsyncAdmin;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.io.crypto.Encryption;
 import org.apache.hadoop.hbase.io.crypto.ManagedKeyData;
 import org.apache.hadoop.hbase.io.crypto.ManagedKeyProvider;
 import org.apache.hadoop.hbase.io.crypto.ManagedKeyState;
 import org.apache.hadoop.hbase.io.crypto.MockManagedKeyProvider;
+import org.apache.hadoop.hbase.keymeta.KeyManagementService;
 import org.apache.hadoop.hbase.keymeta.KeymetaAdminImpl;
 import org.apache.hadoop.hbase.keymeta.KeymetaTableAccessor;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -69,15 +78,15 @@ import org.junit.runners.Suite;
 
 @RunWith(Suite.class)
 @Suite.SuiteClasses({ TestKeymetaAdminImpl.TestWhenDisabled.class,
-  TestKeymetaAdminImpl.TestAdminImpl.class,
-  TestKeymetaAdminImpl.TestForKeyProviderNullReturn.class, })
+  TestKeymetaAdminImpl.TestAdminImpl.class, 
TestKeymetaAdminImpl.TestForKeyProviderNullReturn.class,
+  TestKeymetaAdminImpl.TestRotateSTK.class })
 @Category({ MasterTests.class, SmallTests.class })
 public class TestKeymetaAdminImpl {
 
   private static final String CUST = "cust1";
-  private static final String ENCODED_CUST = 
ManagedKeyProvider.encodeToStr(CUST.getBytes());
+  private static final byte[] CUST_BYTES = CUST.getBytes();
 
-  private final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+  protected final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
 
   @Rule
   public TestName name = new TestName();
@@ -129,9 +138,9 @@ public class TestKeymetaAdminImpl {
     @Test
     public void testDisabled() throws Exception {
       assertThrows(IOException.class, () -> keymetaAdmin
-        .enableKeyManagement(ManagedKeyData.KEY_GLOBAL_CUSTODIAN, 
KEY_SPACE_GLOBAL));
-      assertThrows(IOException.class,
-        () -> keymetaAdmin.getManagedKeys(ManagedKeyData.KEY_GLOBAL_CUSTODIAN, 
KEY_SPACE_GLOBAL));
+        .enableKeyManagement(ManagedKeyData.KEY_GLOBAL_CUSTODIAN_BYTES, 
KEY_SPACE_GLOBAL));
+      assertThrows(IOException.class, () -> keymetaAdmin
+        .getManagedKeys(ManagedKeyData.KEY_GLOBAL_CUSTODIAN_BYTES, 
KEY_SPACE_GLOBAL));
     }
   }
 
@@ -164,40 +173,35 @@ public class TestKeymetaAdminImpl {
       when(keymetaAccessor.getActiveKey(CUST.getBytes(), keySpace))
         .thenReturn(managedKeyProvider.getManagedKey(CUST.getBytes(), 
keySpace));
 
-      List<ManagedKeyData> managedKeys = 
keymetaAdmin.enableKeyManagement(ENCODED_CUST, keySpace);
-      assertNotNull(managedKeys);
-      assertEquals(1, managedKeys.size());
-      assertEquals(keyState, managedKeys.get(0).getKeyState());
+      ManagedKeyData managedKey = keymetaAdmin.enableKeyManagement(CUST_BYTES, 
keySpace);
+      assertNotNull(managedKey);
+      assertEquals(keyState, managedKey.getKeyState());
       verify(keymetaAccessor).getActiveKey(CUST.getBytes(), keySpace);
 
-      keymetaAdmin.getManagedKeys(ENCODED_CUST, keySpace);
+      keymetaAdmin.getManagedKeys(CUST_BYTES, keySpace);
       verify(keymetaAccessor).getAllKeys(CUST.getBytes(), keySpace);
     }
 
     @Test
     public void testEnableKeyManagement() throws Exception {
       assumeTrue(keyState == ACTIVE);
-      List<ManagedKeyData> keys = 
keymetaAdmin.enableKeyManagement(ENCODED_CUST, "namespace1");
-      assertEquals(1, keys.size());
-      assertEquals(ManagedKeyState.ACTIVE, keys.get(0).getKeyState());
-      assertEquals(ENCODED_CUST, keys.get(0).getKeyCustodianEncoded());
-      assertEquals("namespace1", keys.get(0).getKeyNamespace());
+      ManagedKeyData managedKey = keymetaAdmin.enableKeyManagement(CUST_BYTES, 
"namespace1");
+      assertEquals(ManagedKeyState.ACTIVE, managedKey.getKeyState());
+      assertEquals(ManagedKeyProvider.encodeToStr(CUST_BYTES), 
managedKey.getKeyCustodianEncoded());
+      assertEquals("namespace1", managedKey.getKeyNamespace());
 
       // Second call should return the same keys since our mock key provider 
returns the same key
-      List<ManagedKeyData> keys2 = 
keymetaAdmin.enableKeyManagement(ENCODED_CUST, "namespace1");
-      assertEquals(1, keys2.size());
-      assertEquals(keys.get(0), keys2.get(0));
+      ManagedKeyData managedKey2 = 
keymetaAdmin.enableKeyManagement(CUST_BYTES, "namespace1");
+      assertEquals(managedKey, managedKey2);
     }
 
     @Test
     public void testEnableKeyManagementWithMultipleNamespaces() throws 
Exception {
-      List<ManagedKeyData> keys = 
keymetaAdmin.enableKeyManagement(ENCODED_CUST, "namespace1");
-      assertEquals(1, keys.size());
-      assertEquals("namespace1", keys.get(0).getKeyNamespace());
+      ManagedKeyData managedKey = keymetaAdmin.enableKeyManagement(CUST_BYTES, 
"namespace1");
+      assertEquals("namespace1", managedKey.getKeyNamespace());
 
-      List<ManagedKeyData> keys2 = 
keymetaAdmin.enableKeyManagement(ENCODED_CUST, "namespace2");
-      assertEquals(1, keys2.size());
-      assertEquals("namespace2", keys2.get(0).getKeyNamespace());
+      ManagedKeyData managedKey2 = 
keymetaAdmin.enableKeyManagement(CUST_BYTES, "namespace2");
+      assertEquals("namespace2", managedKey2.getKeyNamespace());
     }
   }
 
@@ -221,10 +225,10 @@ public class TestKeymetaAdminImpl {
       MockManagedKeyProvider managedKeyProvider =
         (MockManagedKeyProvider) Encryption.getManagedKeyProvider(conf);
       String cust = "invalidcust1";
-      String encodedCust = ManagedKeyProvider.encodeToStr(cust.getBytes());
+      byte[] custBytes = cust.getBytes();
       managedKeyProvider.setMockedKey(cust, null, keySpace);
       IOException ex = assertThrows(IOException.class,
-        () -> keymetaAdmin.enableKeyManagement(encodedCust, keySpace));
+        () -> keymetaAdmin.enableKeyManagement(custBytes, keySpace));
       assertEquals("Invalid null managed key received from key provider", 
ex.getMessage());
     }
   }
@@ -266,4 +270,158 @@ public class TestKeymetaAdminImpl {
     }
     return true;
   }
+
+  /**
+   * Test class for rotateSTK API
+   */
+  @RunWith(BlockJUnit4ClassRunner.class)
+  @Category({ MasterTests.class, SmallTests.class })
+  public static class TestRotateSTK extends TestKeymetaAdminImpl {
+    @ClassRule
+    public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestRotateSTK.class);
+
+    private ServerManager mockServerManager = mock(ServerManager.class);
+    private AsyncClusterConnection mockConnection;
+    private AsyncAdmin mockAsyncAdmin;
+
+    @Override
+    public void setUp() throws Exception {
+      super.setUp();
+      mockConnection = mock(AsyncClusterConnection.class);
+      mockAsyncAdmin = mock(AsyncAdmin.class);
+      when(mockServer.getServerManager()).thenReturn(mockServerManager);
+      when(mockServer.getAsyncClusterConnection()).thenReturn(mockConnection);
+      when(mockConnection.getAdmin()).thenReturn(mockAsyncAdmin);
+    }
+
+    /**
+     * Test rotateSTK when a new key is detected. Now that we can mock 
SystemKeyManager via
+     * master.getSystemKeyManager(), we can properly test the success 
scenario: 1.
+     * SystemKeyManager.rotateSystemKeyIfChanged() returns non-null (new key 
detected) 2. Master
+     * gets list of online region servers 3. Master makes parallel RPC calls 
to all region servers
+     * 4. All region servers successfully rebuild their system key cache 5. 
Method returns true
+     */
+    @Test
+    public void testRotateSTKWithNewKey() throws Exception {
+      // Setup mocks for MasterServices
+      // Mock SystemKeyManager to return a new key (non-null)
+      when(mockServer.rotateSystemKeyIfChanged()).thenReturn(true);
+
+      when(mockAsyncAdmin.refreshSystemKeyCacheOnAllServers(any()))
+        .thenReturn(CompletableFuture.completedFuture(null));
+
+      KeymetaAdminImplForTest admin = new KeymetaAdminImplForTest(mockServer, 
keymetaAccessor);
+
+      // Call rotateSTK - should return true since new key was detected
+      boolean result = admin.rotateSTK();
+
+      // Verify the result
+      assertTrue("rotateSTK should return true when new key is detected", 
result);
+
+      // Verify that rotateSystemKeyIfChanged was called
+      verify(mockServer).rotateSystemKeyIfChanged();
+      verify(mockAsyncAdmin).refreshSystemKeyCacheOnAllServers(any());
+    }
+
+    /**
+     * Test rotateSTK when no key change is detected. Now that we can mock 
SystemKeyManager, we can
+     * properly test the no-change scenario: 1. 
SystemKeyManager.rotateSystemKeyIfChanged() returns
+     * null 2. Method returns false immediately without calling any region 
servers 3. No RPC calls
+     * are made to region servers
+     */
+    @Test
+    public void testRotateSTKNoChange() throws Exception {
+      // Mock SystemKeyManager to return null (no key change)
+      when(mockServer.rotateSystemKeyIfChanged()).thenReturn(false);
+
+      KeymetaAdminImplForTest admin = new KeymetaAdminImplForTest(mockServer, 
keymetaAccessor);
+
+      // Call rotateSTK - should return false since no key change was detected
+      boolean result = admin.rotateSTK();
+
+      // Verify the result
+      assertFalse("rotateSTK should return false when no key change is 
detected", result);
+
+      // Verify that rotateSystemKeyIfChanged was called
+      verify(mockServer).rotateSystemKeyIfChanged();
+
+      // Verify that getOnlineServersList was never called (short-circuit 
behavior)
+      verify(mockServerManager, never()).getOnlineServersList();
+    }
+
+    @Test
+    public void testRotateSTKOnIOException() throws Exception {
+      when(mockServer.rotateSystemKeyIfChanged()).thenThrow(new 
IOException("test"));
+
+      KeymetaAdminImpl admin = new KeymetaAdminImpl(mockServer);
+      IOException ex = assertThrows(IOException.class, () -> 
admin.rotateSTK());
+      assertTrue("Exception message should contain 'test', but was: " + 
ex.getMessage(),
+        ex.getMessage().equals("test"));
+    }
+
+    /**
+     * Test rotateSTK when region server refresh fails.
+     */
+    @Test
+    public void testRotateSTKWithFailedServerRefresh() throws Exception {
+      // Setup mocks for MasterServices
+      // Mock SystemKeyManager to return a new key (non-null)
+      when(mockServer.rotateSystemKeyIfChanged()).thenReturn(true);
+
+      CompletableFuture<Void> failedFuture = new CompletableFuture<>();
+      failedFuture.completeExceptionally(new IOException("refresh failed"));
+      
when(mockAsyncAdmin.refreshSystemKeyCacheOnAllServers(any())).thenReturn(failedFuture);
+
+      KeymetaAdminImplForTest admin = new KeymetaAdminImplForTest(mockServer, 
keymetaAccessor);
+
+      // Call rotateSTK and expect IOException
+      IOException ex = assertThrows(IOException.class, () -> 
admin.rotateSTK());
+
+      assertTrue(ex.getMessage()
+        .contains("Failed to initiate System Key cache refresh on one or more 
region servers"));
+
+      // Verify that rotateSystemKeyIfChanged was called
+      verify(mockServer).rotateSystemKeyIfChanged();
+      verify(mockAsyncAdmin).refreshSystemKeyCacheOnAllServers(any());
+    }
+
+    @Test
+    public void testRotateSTKNotOnMaster() throws Exception {
+      // Create a non-master server mock
+      Server mockRegionServer = mock(Server.class);
+      KeyManagementService mockKeyService = mock(KeyManagementService.class);
+      // Mock KeyManagementService - required by KeyManagementBase constructor
+      
when(mockRegionServer.getKeyManagementService()).thenReturn(mockKeyService);
+      when(mockKeyService.getConfiguration()).thenReturn(conf);
+      when(mockRegionServer.getConfiguration()).thenReturn(conf);
+      when(mockRegionServer.getFileSystem()).thenReturn(mockFileSystem);
+
+      KeymetaAdminImpl admin = new KeymetaAdminImpl(mockRegionServer) {
+        @Override
+        protected AsyncAdmin getAsyncAdmin(MasterServices master) {
+          throw new RuntimeException("Shouldn't be called since we are not on 
master");
+        }
+      };
+
+      IOException ex = assertThrows(IOException.class, () -> 
admin.rotateSTK());
+      assertTrue(ex.getMessage().contains("rotateSTK can only be called on 
master"));
+    }
+
+    @Test
+    public void testRotateSTKWhenDisabled() throws Exception {
+      
TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_MANAGED_KEYS_ENABLED_CONF_KEY,
 "false");
+
+      KeymetaAdminImpl admin = new KeymetaAdminImpl(mockServer) {
+        @Override
+        protected AsyncAdmin getAsyncAdmin(MasterServices master) {
+          throw new RuntimeException("Shouldn't be called since we are 
disabled");
+        }
+      };
+
+      IOException ex = assertThrows(IOException.class, () -> 
admin.rotateSTK());
+      assertTrue("Exception message should contain 'not enabled', but was: " + 
ex.getMessage(),
+        ex.getMessage().contains("not enabled"));
+    }
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSRpcServices.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSRpcServices.java
index ca7e20f5869..3d367d82012 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSRpcServices.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSRpcServices.java
@@ -18,11 +18,22 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.ipc.RpcCall;
 import org.apache.hadoop.hbase.ipc.RpcServer;
@@ -35,6 +46,11 @@ import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.EmptyMsg;
+
 /**
  * Test parts of {@link RSRpcServices}
  */
@@ -69,4 +85,119 @@ public class TestRSRpcServices {
       null, null, false, false, clientIpAndPort, userNameTest);
     LOG.info("rsh: {}", rsh);
   }
+
+  /**
+   * Test the refreshSystemKeyCache RPC method that is used to rebuild the 
system key cache on
+   * region servers when a system key rotation has occurred.
+   */
+  @Test
+  public void testRefreshSystemKeyCache() throws Exception {
+    // Create mocks
+    HRegionServer mockServer = mock(HRegionServer.class);
+    Configuration conf = HBaseConfiguration.create();
+    FileSystem mockFs = mock(FileSystem.class);
+
+    when(mockServer.getConfiguration()).thenReturn(conf);
+    when(mockServer.isOnline()).thenReturn(true);
+    when(mockServer.isAborted()).thenReturn(false);
+    when(mockServer.isStopped()).thenReturn(false);
+    when(mockServer.isDataFileSystemOk()).thenReturn(true);
+    when(mockServer.getFileSystem()).thenReturn(mockFs);
+
+    // Create RSRpcServices
+    RSRpcServices rpcServices = new RSRpcServices(mockServer);
+
+    // Create request
+    EmptyMsg request = EmptyMsg.getDefaultInstance();
+    RpcController controller = mock(RpcController.class);
+
+    // Call the RPC method
+    EmptyMsg response = rpcServices.refreshSystemKeyCache(controller, request);
+
+    // Verify the response is not null
+    assertNotNull("Response should not be null", response);
+
+    // Verify that rebuildSystemKeyCache was called on the server
+    verify(mockServer).rebuildSystemKeyCache();
+
+    LOG.info("refreshSystemKeyCache test completed successfully");
+  }
+
+  /**
+   * Test that refreshSystemKeyCache throws ServiceException when server is 
not online
+   */
+  @Test
+  public void testRefreshSystemKeyCacheWhenServerStopped() throws Exception {
+    // Create mocks
+    HRegionServer mockServer = mock(HRegionServer.class);
+    Configuration conf = HBaseConfiguration.create();
+    FileSystem mockFs = mock(FileSystem.class);
+
+    when(mockServer.getConfiguration()).thenReturn(conf);
+    when(mockServer.isOnline()).thenReturn(true);
+    when(mockServer.isAborted()).thenReturn(false);
+    when(mockServer.isStopped()).thenReturn(true); // Server is stopped
+    when(mockServer.isDataFileSystemOk()).thenReturn(true);
+    when(mockServer.getFileSystem()).thenReturn(mockFs);
+
+    // Create RSRpcServices
+    RSRpcServices rpcServices = new RSRpcServices(mockServer);
+
+    // Create request
+    EmptyMsg request = EmptyMsg.getDefaultInstance();
+    RpcController controller = mock(RpcController.class);
+
+    // Call the RPC method and expect ServiceException
+    try {
+      rpcServices.refreshSystemKeyCache(controller, request);
+      fail("Expected ServiceException when server is stopped");
+    } catch (ServiceException e) {
+      // Expected
+      assertTrue("Exception should mention server stopping",
+        e.getCause().getMessage().contains("stopping"));
+      LOG.info("Correctly threw ServiceException when server is stopped");
+    }
+  }
+
+  /**
+   * Test that refreshSystemKeyCache throws ServiceException when 
rebuildSystemKeyCache fails
+   */
+  @Test
+  public void testRefreshSystemKeyCacheWhenRebuildFails() throws Exception {
+    // Create mocks
+    HRegionServer mockServer = mock(HRegionServer.class);
+    Configuration conf = HBaseConfiguration.create();
+    FileSystem mockFs = mock(FileSystem.class);
+
+    when(mockServer.getConfiguration()).thenReturn(conf);
+    when(mockServer.isOnline()).thenReturn(true);
+    when(mockServer.isAborted()).thenReturn(false);
+    when(mockServer.isStopped()).thenReturn(false);
+    when(mockServer.isDataFileSystemOk()).thenReturn(true);
+    when(mockServer.getFileSystem()).thenReturn(mockFs);
+
+    // Make rebuildSystemKeyCache throw IOException
+    IOException testException = new IOException("Test failure rebuilding 
cache");
+    doThrow(testException).when(mockServer).rebuildSystemKeyCache();
+
+    // Create RSRpcServices
+    RSRpcServices rpcServices = new RSRpcServices(mockServer);
+
+    // Create request
+    EmptyMsg request = EmptyMsg.getDefaultInstance();
+    RpcController controller = mock(RpcController.class);
+
+    // Call the RPC method and expect ServiceException
+    try {
+      rpcServices.refreshSystemKeyCache(controller, request);
+      fail("Expected ServiceException when rebuildSystemKeyCache fails");
+    } catch (ServiceException e) {
+      // Expected
+      assertEquals("Test failure rebuilding cache", e.getCause().getMessage());
+      LOG.info("Correctly threw ServiceException when rebuildSystemKeyCache 
fails");
+    }
+
+    // Verify that rebuildSystemKeyCache was called
+    verify(mockServer).rebuildSystemKeyCache();
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java
index a59b2966b89..1ea386aba92 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java
@@ -999,4 +999,9 @@ public class VerifyingRSGroupAdmin implements Admin, 
Closeable {
   public boolean isReplicationPeerModificationEnabled() throws IOException {
     return admin.isReplicationPeerModificationEnabled();
   }
+
+  @Override
+  public void refreshSystemKeyCacheOnAllServers(Set<ServerName> regionServers) 
throws IOException {
+    admin.refreshSystemKeyCacheOnAllServers(regionServers);
+  }
 }
diff --git a/hbase-shell/src/main/ruby/hbase/keymeta_admin.rb 
b/hbase-shell/src/main/ruby/hbase/keymeta_admin.rb
index f70abbdde55..89b42c4070b 100644
--- a/hbase-shell/src/main/ruby/hbase/keymeta_admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/keymeta_admin.rb
@@ -20,6 +20,7 @@
 
 require 'java'
 java_import org.apache.hadoop.hbase.io.crypto.ManagedKeyData
+java_import org.apache.hadoop.hbase.io.crypto.ManagedKeyProvider
 java_import org.apache.hadoop.hbase.keymeta.KeymetaAdminClient
 
 module Hbase
@@ -46,11 +47,25 @@ module Hbase
       @admin.getManagedKeys(cust, namespace)
     end
 
+    def rotate_stk
+      @admin.rotateSTK
+    end
+
     def extract_cust_info(key_info)
       cust_info = key_info.split(':')
       raise(ArgumentError, 'Invalid cust:namespace format') unless [1, 
2].include?(cust_info.length)
 
-      [cust_info[0], cust_info.length > 1 ? cust_info[1] : 
ManagedKeyData::KEY_SPACE_GLOBAL]
+      custodian = cust_info[0]
+      namespace = cust_info.length > 1 ? cust_info[1] : 
ManagedKeyData::KEY_SPACE_GLOBAL
+
+      begin
+        cust_bytes = ManagedKeyProvider.decodeToBytes(custodian)
+      rescue Java::JavaIo::IOException => e
+        message = e.cause&.message || e.message
+        raise(ArgumentError, "Failed to decode key custodian '#{custodian}': 
#{message}")
+      end
+
+      [cust_bytes, namespace]
     end
   end
 end
diff --git a/hbase-shell/src/main/ruby/shell.rb 
b/hbase-shell/src/main/ruby/shell.rb
index 8bc4852bc3b..47dc7541b96 100644
--- a/hbase-shell/src/main/ruby/shell.rb
+++ b/hbase-shell/src/main/ruby/shell.rb
@@ -151,7 +151,7 @@ module Shell
     end
 
     def hbase_keymeta_admin
-      @keymeta_admin ||= hbase.keymeta_admin
+      @hbase_keymeta_admin ||= hbase.keymeta_admin
     end
 
     ##
@@ -629,6 +629,7 @@ Shell.load_command_group(
   commands: %w[
     enable_key_management
     show_key_status
+    rotate_stk
   ]
 )
 
diff --git a/hbase-shell/src/main/ruby/shell/commands/enable_key_management.rb 
b/hbase-shell/src/main/ruby/shell/commands/enable_key_management.rb
index 9a6d0422ad4..da3fe6ad8c9 100644
--- a/hbase-shell/src/main/ruby/shell/commands/enable_key_management.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/enable_key_management.rb
@@ -37,7 +37,7 @@ Example:
       end
 
       def command(key_info)
-        statuses = keymeta_admin.enable_key_management(key_info)
+        statuses = [keymeta_admin.enable_key_management(key_info)]
         print_key_statuses(statuses)
       end
     end
diff --git a/hbase-shell/src/main/ruby/shell/commands/enable_key_management.rb 
b/hbase-shell/src/main/ruby/shell/commands/rotate_stk.rb
similarity index 58%
copy from hbase-shell/src/main/ruby/shell/commands/enable_key_management.rb
copy to hbase-shell/src/main/ruby/shell/commands/rotate_stk.rb
index 9a6d0422ad4..2a8f4306ad5 100644
--- a/hbase-shell/src/main/ruby/shell/commands/enable_key_management.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/rotate_stk.rb
@@ -22,24 +22,31 @@ require 'shell/commands/keymeta_command_base'
 
 module Shell
   module Commands
-    # EnableKeyManagement is a class that provides a Ruby interface to enable 
key management via
-    # HBase Key Management API.
-    class EnableKeyManagement < KeymetaCommandBase
+    # RotateStk is a class that provides a Ruby interface to rotate the System 
Key (STK)
+    # via HBase Key Management API.
+    class RotateStk < KeymetaCommandBase
       def help
         <<-EOF
-Enable key management for a given cust:namespace (cust in Base64 format).
-If no namespace is specified, the global namespace (*) is used.
+Rotate the System Key (STK) if a new key is detected.
+This command checks for a new system key and propagates it to all region 
servers.
+Returns true if a new key was detected and rotated, false otherwise.
 
 Example:
-  hbase> enable_key_management 'cust:namespace'
-  hbase> enable_key_management 'cust'
+  hbase> rotate_stk
         EOF
       end
 
-      def command(key_info)
-        statuses = keymeta_admin.enable_key_management(key_info)
-        print_key_statuses(statuses)
+      def command
+        result = keymeta_admin.rotate_stk
+        if result
+          formatter.row(['System Key rotation was performed successfully and 
cache was refreshed ' \
+            'on all region servers'])
+        else
+          formatter.row(['No System Key change was detected'])
+        end
+        result
       end
     end
   end
 end
+
diff --git 
a/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestKeymetaMockProviderShell.java
 
b/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestKeymetaMockProviderShell.java
new file mode 100644
index 00000000000..cc4aabe4ff4
--- /dev/null
+++ 
b/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestKeymetaMockProviderShell.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.keymeta.ManagedKeyTestBase;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.IntegrationTests;
+import org.jruby.embed.ScriptingContainer;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ClientTests.class, IntegrationTests.class })
+public class TestKeymetaMockProviderShell extends ManagedKeyTestBase 
implements RubyShellTest {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestKeymetaMockProviderShell.class);
+
+  private final ScriptingContainer jruby = new ScriptingContainer();
+
+  @Before
+  @Override
+  public void setUp() throws Exception {
+    // Enable to be able to debug without timing out.
+    // final Configuration conf = TEST_UTIL.getConfiguration();
+    // conf.set("zookeeper.session.timeout", "6000000");
+    // conf.set("hbase.rpc.timeout", "6000000");
+    // conf.set("hbase.rpc.read.timeout", "6000000");
+    // conf.set("hbase.rpc.write.timeout", "6000000");
+    // conf.set("hbase.client.operation.timeout", "6000000");
+    // conf.set("hbase.client.scanner.timeout.period", "6000000");
+    // conf.set("hbase.ipc.client.socket.timeout.connect", "6000000");
+    // conf.set("hbase.ipc.client.socket.timeout.read", "6000000");
+    // conf.set("hbase.ipc.client.socket.timeout.write", "6000000");
+    // conf.set("hbase.master.start.timeout.localHBaseCluster", "6000000");
+    // conf.set("hbase.master.init.timeout.localHBaseCluster", "6000000");
+    // conf.set("hbase.client.sync.wait.timeout.msec", "6000000");
+    // conf.set("hbase.client.retries.number", "1000");
+    RubyShellTest.setUpConfig(this);
+    super.setUp();
+    RubyShellTest.setUpJRubyRuntime(this);
+    RubyShellTest.doTestSetup(this);
+    jruby.put("$TEST", this);
+  }
+
+  @Override
+  public HBaseTestingUtil getTEST_UTIL() {
+    return TEST_UTIL;
+  }
+
+  @Override
+  public ScriptingContainer getJRuby() {
+    return jruby;
+  }
+
+  @Override
+  public String getSuitePattern() {
+    return "**/*_keymeta_mock_provider_test.rb";
+  }
+
+  @Test
+  public void testRunShellTests() throws Exception {
+    RubyShellTest.testRunShellTests(this);
+  }
+}
diff --git a/hbase-shell/src/test/ruby/shell/admin_keymeta_test.rb 
b/hbase-shell/src/test/ruby/shell/admin_keymeta_test.rb
index 9f3048ab599..83a73842711 100644
--- a/hbase-shell/src/test/ruby/shell/admin_keymeta_test.rb
+++ b/hbase-shell/src/test/ruby/shell/admin_keymeta_test.rb
@@ -40,18 +40,18 @@ module Hbase
       test_key_management($CUST1_ENCODED, 'test_namespace')
       test_key_management($GLOB_CUST_ENCODED, '*')
 
-      puts "Testing that cluster can be restarted when key management is 
enabled"
-      $TEST.restartMiniCluster()
-      puts "Cluster restarted, testing key management again"
+      puts 'Testing that cluster can be restarted when key management is 
enabled'
+      $TEST.restartMiniCluster
+      puts 'Cluster restarted, testing key management again'
       setup_hbase
       test_key_management($GLOB_CUST_ENCODED, '*')
-      puts "Key management test complete"
+      puts 'Key management test complete'
     end
 
     def test_key_management(cust, namespace)
       # Repeat the enable twice in a loop and ensure multiple enables succeed 
and return the
       # same output.
-      2.times do |i|
+      2.times do
         cust_and_namespace = "#{cust}:#{namespace}"
         output = capture_stdout { @shell.command('enable_key_management', 
cust_and_namespace) }
         puts "enable_key_management output: #{output}"
@@ -62,5 +62,16 @@ module Hbase
         assert(output.include?('1 row(s)'))
       end
     end
+
+    define_test 'Decode failure raises friendly error' do
+      assert_raises(ArgumentError) do
+        @shell.command('enable_key_management', '!!!:namespace')
+      end
+
+      error = assert_raises(ArgumentError) do
+        @shell.command('show_key_status', '!!!:namespace')
+      end
+      assert_match(/Failed to decode key custodian/, error.message)
+    end
   end
 end
diff --git a/hbase-shell/src/test/ruby/shell/encrypted_table_keymeta_test.rb 
b/hbase-shell/src/test/ruby/shell/encrypted_table_keymeta_test.rb
index 2562a64779e..35ad85785e0 100644
--- a/hbase-shell/src/test/ruby/shell/encrypted_table_keymeta_test.rb
+++ b/hbase-shell/src/test/ruby/shell/encrypted_table_keymeta_test.rb
@@ -46,7 +46,7 @@ module Hbase
 
     def setup
       setup_hbase
-      @test_table = 'enctest'+Time.now.to_i.to_s
+      @test_table = "enctest#{Time.now.to_i}"
       @connection = $TEST_CLUSTER.connection
     end
 
@@ -54,14 +54,17 @@ module Hbase
       # Custodian is currently not supported, so this will end up falling back 
to local key
       # generation.
       test_table_put_get_with_encryption($CUST1_ENCODED, '*',
-        { 'NAME' => 'f', 'ENCRYPTION' => 'AES' }, true)
+                                         { 'NAME' => 'f', 'ENCRYPTION' => 
'AES' },
+                                         true)
     end
 
     define_test 'Test table with custom namespace attribute in Column Family' 
do
-      custom_namespace = "test_global_namespace"
-      test_table_put_get_with_encryption($GLOB_CUST_ENCODED, custom_namespace,
+      custom_namespace = 'test_global_namespace'
+      test_table_put_get_with_encryption(
+        $GLOB_CUST_ENCODED, custom_namespace,
         { 'NAME' => 'f', 'ENCRYPTION' => 'AES', 'ENCRYPTION_KEY_NAMESPACE' => 
custom_namespace },
-        false)
+        false
+      )
     end
 
     def test_table_put_get_with_encryption(cust, namespace, table_attrs, 
fallback_scenario)
@@ -88,19 +91,19 @@ module Hbase
       assert_not_nil(hfile_info)
       live_trailer = hfile_info.getTrailer
       assert_trailer(live_trailer)
-      assert_equal(namespace, live_trailer.getKeyNamespace())
+      assert_equal(namespace, live_trailer.getKeyNamespace)
 
       # When active key is supposed to be used, we can valiate the key bytes 
in the context against
       # the actual key from provider.
-      if !fallback_scenario
-        encryption_context = 
hfile_info.getHFileContext().getEncryptionContext()
+      unless fallback_scenario
+        encryption_context = hfile_info.getHFileContext.getEncryptionContext
         assert_not_nil(encryption_context)
-        assert_not_nil(encryption_context.getKeyBytes())
+        assert_not_nil(encryption_context.getKeyBytes)
         key_provider = 
Encryption.getManagedKeyProvider($TEST_CLUSTER.getConfiguration)
         key_data = 
key_provider.getManagedKey(ManagedKeyProvider.decodeToBytes(cust), namespace)
         assert_not_nil(key_data)
-        assert_equal(namespace, key_data.getKeyNamespace())
-        assert_equal(key_data.getTheKey().getEncoded(), 
encryption_context.getKeyBytes())
+        assert_equal(namespace, key_data.getKeyNamespace)
+        assert_equal(key_data.getTheKey.getEncoded, 
encryption_context.getKeyBytes)
       end
 
       ## Disable table to ensure that the stores are not cached.
diff --git 
a/hbase-shell/src/test/ruby/shell/key_provider_keymeta_migration_test.rb 
b/hbase-shell/src/test/ruby/shell/key_provider_keymeta_migration_test.rb
index 978ee79e865..8a179c8af2f 100644
--- a/hbase-shell/src/test/ruby/shell/key_provider_keymeta_migration_test.rb
+++ b/hbase-shell/src/test/ruby/shell/key_provider_keymeta_migration_test.rb
@@ -98,7 +98,7 @@ module Hbase
           expected_namespace: { 'f' => 'shared-global-key' }
         },
         @table_cf_keys => {
-          cfs: ['cf1', 'cf2'],
+          cfs: %w[cf1 cf2],
           expected_namespace: {
             'cf1' => "#{@table_cf_keys}/cf1",
             'cf2' => "#{@table_cf_keys}/cf2"
@@ -106,22 +106,21 @@ module Hbase
         }
       }
 
-
       # Setup initial KeyStoreKeyProvider
       setup_old_key_provider
-      puts "  >> Starting Cluster"
-      $TEST.startMiniCluster()
-      puts "  >> Cluster started"
+      puts '  >> Starting Cluster'
+      $TEST.startMiniCluster
+      puts '  >> Cluster started'
 
       setup_hbase
     end
 
     define_test 'Test complete key provider migration' do
-      puts "\n=== Starting Key Provider Migration Test ==="
+      puts '\n=== Starting Key Provider Migration Test ==='
 
       # Step 1-3: Setup old provider and create tables
       create_test_tables
-      puts "\n--- Validating initial table operations ---"
+      puts '\n--- Validating initial table operations ---'
       validate_pre_migration_operations(false)
 
       # Step 4: Setup new provider and restart
@@ -134,13 +133,13 @@ module Hbase
       # Step 6: Cleanup and final validation
       cleanup_old_provider_and_validate
 
-      puts "\n=== Migration Test Completed Successfully ==="
+      puts '\n=== Migration Test Completed Successfully ==='
     end
 
     private
 
     def setup_old_key_provider
-      puts "\n--- Setting up old KeyStoreKeyProvider ---"
+      puts '\n--- Setting up old KeyStoreKeyProvider ---'
 
       # Use proper test directory (similar to 
KeymetaTestUtils.setupTestKeyStore)
       test_data_dir = 
$TEST_CLUSTER.getDataTestDir("old_keystore_#{@test_timestamp}").toString
@@ -150,22 +149,23 @@ module Hbase
 
       # Create keystore with only the master key
       # ENCRYPTION_KEY attributes generate their own keys and don't use 
keystore entries
-      create_keystore(@old_keystore_file, {
-        @master_key_alias => generate_key(@master_key_alias)
-      })
+      create_keystore(@old_keystore_file, { @master_key_alias => 
generate_key(@master_key_alias) })
 
       # Configure old KeyStoreKeyProvider
-      provider_uri = 
"jceks://#{File.expand_path(@old_keystore_file)}?password=#{@keystore_password}"
+      provider_uri = "jceks://#{File.expand_path(@old_keystore_file)}?" \
+        "password=#{@keystore_password}"
       
$TEST_CLUSTER.getConfiguration.set(HConstants::CRYPTO_KEYPROVIDER_CONF_KEY,
-                                        KeyStoreKeyProvider.java_class.name)
-      
$TEST_CLUSTER.getConfiguration.set(HConstants::CRYPTO_KEYPROVIDER_PARAMETERS_KEY,
 provider_uri)
-      
$TEST_CLUSTER.getConfiguration.set(HConstants::CRYPTO_MASTERKEY_NAME_CONF_KEY, 
@master_key_alias)
+                                         KeyStoreKeyProvider.java_class.name)
+      
$TEST_CLUSTER.getConfiguration.set(HConstants::CRYPTO_KEYPROVIDER_PARAMETERS_KEY,
+                                         provider_uri)
+      
$TEST_CLUSTER.getConfiguration.set(HConstants::CRYPTO_MASTERKEY_NAME_CONF_KEY,
+                                         @master_key_alias)
 
       puts "  >> Old KeyStoreKeyProvider configured with keystore: 
#{@old_keystore_file}"
     end
 
     def create_test_tables
-      puts "\n--- Creating test tables ---"
+      puts '\n--- Creating test tables ---'
 
       # 1. Table without encryption
       command(:create, @table_no_encryption, { 'NAME' => 'f' })
@@ -177,17 +177,17 @@ module Hbase
 
       # 3. Table with table-level key
       command(:create, @table_table_key, { 'NAME' => 'f', 'ENCRYPTION' => 
'AES',
-                                          'ENCRYPTION_KEY' => @table_key_alias 
})
+                                           'ENCRYPTION_KEY' => 
@table_key_alias })
       puts "  >> Created table #{@table_table_key} with table-level key"
 
       # 4. First table with shared key
       command(:create, @table_shared_key1, { 'NAME' => 'f', 'ENCRYPTION' => 
'AES',
-                                            'ENCRYPTION_KEY' => 
@shared_key_alias })
+                                             'ENCRYPTION_KEY' => 
@shared_key_alias })
       puts "  >> Created table #{@table_shared_key1} with shared key"
 
       # 5. Second table with shared key
       command(:create, @table_shared_key2, { 'NAME' => 'f', 'ENCRYPTION' => 
'AES',
-                                            'ENCRYPTION_KEY' => 
@shared_key_alias })
+                                             'ENCRYPTION_KEY' => 
@shared_key_alias })
       puts "  >> Created table #{@table_shared_key2} with shared key"
 
       # 6. Table with column family specific keys
@@ -199,10 +199,10 @@ module Hbase
 
     def validate_pre_migration_operations(is_key_management_enabled)
       @tables_metadata.each do |table_name, metadata|
-        puts "  >> test_table_operations on table: #{table_name} with CFs: 
#{metadata[:cfs].join(', ')}"
-        if metadata[:no_encryption]
-          next
-        end
+        puts "  >> test_table_operations on table: #{table_name} with CFs: " \
+          "#{metadata[:cfs].join(', ')}"
+        next if metadata[:no_encryption]
+
         test_table_operations(table_name, metadata[:cfs])
         check_hfile_trailers_pre_migration(table_name, metadata[:cfs], 
is_key_management_enabled)
       end
@@ -230,14 +230,14 @@ module Hbase
         get_result = test_table.table.get(Get.new(Bytes.toBytes('row1')))
         assert_false(get_result.isEmpty)
         assert_equal('value1',
-                    Bytes.toString(get_result.getValue(Bytes.toBytes(cf), 
Bytes.toBytes('col1'))))
+                     Bytes.toString(get_result.getValue(Bytes.toBytes(cf), 
Bytes.toBytes('col1'))))
       end
 
       puts "    >> Operations validated for #{table_name}"
     end
 
     def setup_new_key_provider
-      puts "\n--- Setting up new ManagedKeyStoreKeyProvider ---"
+      puts '\n--- Setting up new ManagedKeyStoreKeyProvider ---'
 
       # Use proper test directory (similar to 
KeymetaTestUtils.setupTestKeyStore)
       test_data_dir = 
$TEST_CLUSTER.getDataTestDir("new_keystore_#{@test_timestamp}").toString
@@ -252,69 +252,85 @@ module Hbase
       create_keystore(@new_keystore_file, migrated_keys)
 
       # Configure ManagedKeyStoreKeyProvider
-      provider_uri = 
"jceks://#{File.expand_path(@new_keystore_file)}?password=#{@keystore_password}"
+      provider_uri = "jceks://#{File.expand_path(@new_keystore_file)}?" \
+        "password=#{@keystore_password}"
       
$TEST_CLUSTER.getConfiguration.set(HConstants::CRYPTO_MANAGED_KEYS_ENABLED_CONF_KEY,
 'true')
       
$TEST_CLUSTER.getConfiguration.set(HConstants::CRYPTO_MANAGED_KEYPROVIDER_CONF_KEY,
-                                        
ManagedKeyStoreKeyProvider.java_class.name)
-      
$TEST_CLUSTER.getConfiguration.set(HConstants::CRYPTO_MANAGED_KEYPROVIDER_PARAMETERS_KEY,
 provider_uri)
-      
$TEST_CLUSTER.getConfiguration.set(HConstants::CRYPTO_MANAGED_KEY_STORE_SYSTEM_KEY_NAME_CONF_KEY,
-                                        'system_key')
+                                         
ManagedKeyStoreKeyProvider.java_class.name)
+      
$TEST_CLUSTER.getConfiguration.set(HConstants::CRYPTO_MANAGED_KEYPROVIDER_PARAMETERS_KEY,
+                                         provider_uri)
+      $TEST_CLUSTER.getConfiguration.set(
+        HConstants::CRYPTO_MANAGED_KEY_STORE_SYSTEM_KEY_NAME_CONF_KEY,
+        'system_key'
+      )
 
       # Setup key configurations for ManagedKeyStoreKeyProvider
       # Shared key configuration
       $TEST_CLUSTER.getConfiguration.set(
         
"hbase.crypto.managed_key_store.cust.#{$GLOB_CUST_ENCODED}.shared-global-key.alias",
-        'shared_global_key')
+        'shared_global_key'
+      )
       $TEST_CLUSTER.getConfiguration.setBoolean(
-        
"hbase.crypto.managed_key_store.cust.#{$GLOB_CUST_ENCODED}.shared-global-key.active",
 true)
+        
"hbase.crypto.managed_key_store.cust.#{$GLOB_CUST_ENCODED}.shared-global-key.active",
+        true
+      )
 
       # Table-level key configuration - let system determine namespace 
automatically
       $TEST_CLUSTER.getConfiguration.set(
         
"hbase.crypto.managed_key_store.cust.#{$GLOB_CUST_ENCODED}.#{@table_table_key}.alias",
-        "#{@table_table_key}_key")
+        "#{@table_table_key}_key"
+      )
       $TEST_CLUSTER.getConfiguration.setBoolean(
-        
"hbase.crypto.managed_key_store.cust.#{$GLOB_CUST_ENCODED}.#{@table_table_key}.active",
 true)
+        
"hbase.crypto.managed_key_store.cust.#{$GLOB_CUST_ENCODED}.#{@table_table_key}.active",
+        true
+      )
 
       # CF-level key configurations - let system determine namespace 
automatically
       $TEST_CLUSTER.getConfiguration.set(
         
"hbase.crypto.managed_key_store.cust.#{$GLOB_CUST_ENCODED}.#{@table_cf_keys}/cf1.alias",
-        "#{@table_cf_keys}_cf1_key")
+        "#{@table_cf_keys}_cf1_key"
+      )
       $TEST_CLUSTER.getConfiguration.setBoolean(
-        
"hbase.crypto.managed_key_store.cust.#{$GLOB_CUST_ENCODED}.#{@table_cf_keys}/cf1.active",
 true)
+        
"hbase.crypto.managed_key_store.cust.#{$GLOB_CUST_ENCODED}.#{@table_cf_keys}/cf1.active",
+        true
+      )
 
       $TEST_CLUSTER.getConfiguration.set(
         
"hbase.crypto.managed_key_store.cust.#{$GLOB_CUST_ENCODED}.#{@table_cf_keys}/cf2.alias",
-        "#{@table_cf_keys}_cf2_key")
+        "#{@table_cf_keys}_cf2_key"
+      )
       $TEST_CLUSTER.getConfiguration.setBoolean(
-        
"hbase.crypto.managed_key_store.cust.#{$GLOB_CUST_ENCODED}.#{@table_cf_keys}/cf2.active",
 true)
+        
"hbase.crypto.managed_key_store.cust.#{$GLOB_CUST_ENCODED}.#{@table_cf_keys}/cf2.active",
+        true
+      )
 
       # Enable KeyMeta coprocessor
       $TEST_CLUSTER.getConfiguration.set('hbase.coprocessor.master.classes',
-                                        KeymetaServiceEndpoint.java_class.name)
+                                         
KeymetaServiceEndpoint.java_class.name)
 
-      puts "  >> New ManagedKeyStoreKeyProvider configured"
+      puts '  >> New ManagedKeyStoreKeyProvider configured'
     end
 
     def restart_cluster_and_validate
-      puts "\n--- Restarting cluster with managed key store key provider ---"
+      puts '\n--- Restarting cluster with managed key store key provider ---'
 
       $TEST.restartMiniCluster(KeymetaTableAccessor::KEY_META_TABLE_NAME)
-      puts "  >> Cluster restarted with ManagedKeyStoreKeyProvider"
+      puts '  >> Cluster restarted with ManagedKeyStoreKeyProvider'
       setup_hbase
 
       # Validate key management service is functional
       output = capture_stdout { command(:show_key_status, 
"#{$GLOB_CUST_ENCODED}:*") }
       assert(output.include?('0 row(s)'), "Expected 0 rows from 
show_key_status, got: #{output}")
-      #assert(output.include?(' FAILED '), "Expected FAILED status for 
show_key_status, got: #{output}")
-      puts "  >> Key management service is functional"
+      puts '  >> Key management service is functional'
 
       # Test operations still work and check HFile trailers
-      puts "\n--- Validating operations after restart ---"
+      puts '\n--- Validating operations after restart ---'
       validate_pre_migration_operations(true)
     end
 
     def check_hfile_trailers_pre_migration(table_name, column_families, 
is_key_management_enabled)
-      puts "    >> Checking HFile trailers for #{table_name} with CFs: 
#{column_families.join(', ')}"
+      puts "    >> Checking HFile trailers for #{table_name} with CFs: " \
+        "#{column_families.join(', ')}"
 
       column_families.each do |cf_name|
         validate_hfile_trailer(table_name, cf_name, false, 
is_key_management_enabled, false)
@@ -322,7 +338,7 @@ module Hbase
     end
 
     def migrate_tables_step_by_step
-      puts "\n--- Performing step-by-step table migration ---"
+      puts '\n--- Performing step-by-step table migration ---'
 
       # Migrate shared key tables first
       migrate_shared_key_tables
@@ -335,74 +351,80 @@ module Hbase
     end
 
     def migrate_shared_key_tables
-      puts "\n--- Migrating shared key tables ---"
+      puts '\n--- Migrating shared key tables ---'
 
       # Enable key management for shared global key
       cust_and_namespace = "#{$GLOB_CUST_ENCODED}:shared-global-key"
       output = capture_stdout { command(:enable_key_management, 
cust_and_namespace) }
       assert(output.include?("#{$GLOB_CUST_ENCODED} shared-global-key ACTIVE"),
-            "Expected ACTIVE status for shared key, got: #{output}")
-      puts "  >> Enabled key management for shared global key"
+             "Expected ACTIVE status for shared key, got: #{output}")
+      puts '  >> Enabled key management for shared global key'
 
       # Migrate first shared key table
-      migrate_table_to_managed_key(@table_shared_key1, 'f', 
'shared-global-key', true)
+      migrate_table_to_managed_key(@table_shared_key1, 'f', 
'shared-global-key',
+                                   use_namespace_attribute: true)
 
       # Migrate second shared key table
-      migrate_table_to_managed_key(@table_shared_key2, 'f', 
'shared-global-key', true)
+      migrate_table_to_managed_key(@table_shared_key2, 'f', 
'shared-global-key',
+                                   use_namespace_attribute: true)
     end
 
     def migrate_table_level_key
-      puts "\n--- Migrating table-level key ---"
+      puts '\n--- Migrating table-level key ---'
 
       # Enable key management for table namespace
       cust_and_namespace = "#{$GLOB_CUST_ENCODED}:#{@table_table_key}"
       output = capture_stdout { command(:enable_key_management, 
cust_and_namespace) }
       assert(output.include?("#{$GLOB_CUST_ENCODED} #{@table_table_key} 
ACTIVE"),
-            "Expected ACTIVE status for table key, got: #{output}")
-      puts "  >> Enabled key management for table-level key"
+             "Expected ACTIVE status for table key, got: #{output}")
+      puts '  >> Enabled key management for table-level key'
 
       # Migrate the table - no namespace attribute, let system auto-determine
-      migrate_table_to_managed_key(@table_table_key, 'f', @table_table_key, 
false)
+      migrate_table_to_managed_key(@table_table_key, 'f', @table_table_key)
     end
 
     def migrate_cf_level_keys
-      puts "\n--- Migrating CF-level keys ---"
+      puts '\n--- Migrating CF-level keys ---'
 
       # Enable key management for CF1
       cf1_namespace = "#{@table_cf_keys}/cf1"
       cust_and_namespace = "#{$GLOB_CUST_ENCODED}:#{cf1_namespace}"
       output = capture_stdout { command(:enable_key_management, 
cust_and_namespace) }
       assert(output.include?("#{$GLOB_CUST_ENCODED} #{cf1_namespace} ACTIVE"),
-            "Expected ACTIVE status for CF1 key, got: #{output}")
-      puts "  >> Enabled key management for CF1"
+             "Expected ACTIVE status for CF1 key, got: #{output}")
+      puts '  >> Enabled key management for CF1'
 
       # Enable key management for CF2
       cf2_namespace = "#{@table_cf_keys}/cf2"
       cust_and_namespace = "#{$GLOB_CUST_ENCODED}:#{cf2_namespace}"
       output = capture_stdout { command(:enable_key_management, 
cust_and_namespace) }
       assert(output.include?("#{$GLOB_CUST_ENCODED} #{cf2_namespace} ACTIVE"),
-            "Expected ACTIVE status for CF2 key, got: #{output}")
-      puts "  >> Enabled key management for CF2"
+             "Expected ACTIVE status for CF2 key, got: #{output}")
+      puts '  >> Enabled key management for CF2'
 
       # Migrate CF1
-      migrate_table_to_managed_key(@table_cf_keys, 'cf1', cf1_namespace, false)
+      migrate_table_to_managed_key(@table_cf_keys, 'cf1', cf1_namespace)
 
       # Migrate CF2
-      migrate_table_to_managed_key(@table_cf_keys, 'cf2', cf2_namespace, false)
+      migrate_table_to_managed_key(@table_cf_keys, 'cf2', cf2_namespace)
     end
 
-    def migrate_table_to_managed_key(table_name, cf_name, namespace, 
use_namespace_attribute = false)
+    def migrate_table_to_managed_key(table_name, cf_name, namespace,
+                                     use_namespace_attribute: false)
       puts "    >> Migrating table #{table_name}, CF #{cf_name} to namespace 
#{namespace}"
 
-      # Use atomic alter operation to remove ENCRYPTION_KEY and optionally add 
ENCRYPTION_KEY_NAMESPACE
+      # Use atomic alter operation to remove ENCRYPTION_KEY and optionally add
+      # ENCRYPTION_KEY_NAMESPACE
       if use_namespace_attribute
         # For shared key tables: remove ENCRYPTION_KEY and add 
ENCRYPTION_KEY_NAMESPACE atomically
         command(:alter, table_name,
-                { 'NAME' => cf_name, 'CONFIGURATION' => {'ENCRYPTION_KEY' => 
'', 'ENCRYPTION_KEY_NAMESPACE' => namespace }})
+                { 'NAME' => cf_name,
+                  'CONFIGURATION' => { 'ENCRYPTION_KEY' => '',
+                                       'ENCRYPTION_KEY_NAMESPACE' => namespace 
} })
       else
         # For table/CF level keys: just remove ENCRYPTION_KEY, let system 
auto-determine namespace
         command(:alter, table_name,
-                { 'NAME' => cf_name, 'CONFIGURATION' => {'ENCRYPTION_KEY' => 
'' }})
+                { 'NAME' => cf_name, 'CONFIGURATION' => { 'ENCRYPTION_KEY' => 
'' } })
       end
 
       puts "      >> Altered #{table_name} CF #{cf_name} to use namespace 
#{namespace}"
@@ -415,7 +437,7 @@ module Hbase
       sleep(5)
 
       # Scan all existing data to verify accessibility
-      scan_and_validate_table(table_name, [cf_name])
+      scan_and_validate_table(table_name)
 
       # Add new data
       test_table = table(table_name)
@@ -428,8 +450,7 @@ module Hbase
       puts "      >> Migration completed for #{table_name} CF #{cf_name}"
     end
 
-
-    def scan_and_validate_table(table_name, column_families)
+    def scan_and_validate_table(table_name)
       puts "      >> Scanning and validating existing data in #{table_name}"
 
       test_table = table(table_name)
@@ -443,7 +464,7 @@ module Hbase
       end
       scanner.close
 
-      assert(row_count > 0, "Expected to find existing data in #{table_name}")
+      assert(row_count.positive?, "Expected to find existing data in 
#{table_name}")
       puts "        >> Found #{row_count} rows, all accessible"
     end
 
@@ -459,17 +480,22 @@ module Hbase
       regions.each do |region|
         region.getStores.each do |store|
           next unless store.getColumnFamilyName == cf_name
-          puts "      >> store file count for CF: #{cf_name} in table: 
#{table_name} is #{store.getStorefiles.size}"
+
+          puts "      >> store file count for CF: #{cf_name} in table: 
#{table_name} is " \
+            "#{store.getStorefiles.size}"
           if is_compacted
             assert_equal(1, store.getStorefiles.size)
           else
-            assert_true(store.getStorefiles.size > 0)
+            assert_true(!store.getStorefiles.empty?)
           end
           store.getStorefiles.each do |storefile|
-            puts "      >> Checking HFile trailer for storefile: 
#{storefile.getPath.getName} with sequence id: #{storefile.getMaxSequenceId} 
against max sequence id of store: #{store.getMaxSequenceId.getAsLong}"
+            puts "      >> Checking HFile trailer for storefile: 
#{storefile.getPath.getName} " \
+              "with sequence id: #{storefile.getMaxSequenceId} against max 
sequence id of " \
+              "store: #{store.getMaxSequenceId.getAsLong}"
             # The flush would have created new HFiles, but the old would still 
be there
             # so we need to make sure to check the latest store only.
             next unless storefile.getMaxSequenceId == 
store.getMaxSequenceId.getAsLong
+
             store_file_info = storefile.getFileInfo
             next unless store_file_info
 
@@ -493,16 +519,15 @@ module Hbase
               puts "        >> Trailer validation passed - namespace: 
#{trailer.getKeyNamespace}"
             else
               assert_nil(trailer.getKeyNamespace)
-              puts "        >> Trailer validation passed - using legacy key 
format"
+              puts '        >> Trailer validation passed - using legacy key 
format'
             end
           end
         end
       end
     end
 
-
     def cleanup_old_provider_and_validate
-      puts "\n--- Cleaning up old key provider and final validation ---"
+      puts '\n--- Cleaning up old key provider and final validation ---'
 
       # Remove old KeyProvider configurations
       
$TEST_CLUSTER.getConfiguration.unset(HConstants::CRYPTO_KEYPROVIDER_CONF_KEY)
@@ -511,11 +536,11 @@ module Hbase
 
       # Remove old keystore
       FileUtils.rm_rf(@old_keystore_file) if 
File.directory?(@old_keystore_file)
-      puts "  >> Removed old keystore and configuration"
+      puts '  >> Removed old keystore and configuration'
 
       # Restart cluster
       $TEST.restartMiniCluster(KeymetaTableAccessor::KEY_META_TABLE_NAME)
-      puts "  >> Cluster restarted without old key provider"
+      puts '  >> Cluster restarted without old key provider'
       setup_hbase
 
       # Validate all data is still accessible
@@ -526,32 +551,31 @@ module Hbase
     end
 
     def validate_all_tables_final
-      puts "\n--- Final validation - scanning all tables ---"
+      puts '\n--- Final validation - scanning all tables ---'
 
       @tables_metadata.each do |table_name, metadata|
-        if metadata[:no_encryption]
-          next
-        end
+        next if metadata[:no_encryption]
+
         puts "  >> Final validation for table: #{table_name} with CFs: 
#{metadata[:cfs].join(', ')}"
-        scan_and_validate_table(table_name, metadata[:cfs])
+        scan_and_validate_table(table_name)
         puts "    >> #{table_name} - all data accessible"
       end
     end
 
     def perform_major_compaction_and_validate
-      puts "\n--- Performing major compaction and final validation ---"
+      puts '\n--- Performing major compaction and final validation ---'
 
       $TEST_CLUSTER.compact(true)
 
       @tables_metadata.each do |table_name, metadata|
-        if metadata[:no_encryption]
-          next
-        end
-        puts "  >> Validating post-compaction HFiles for table: #{table_name} 
with CFs: #{metadata[:cfs].join(', ')}"
+        next if metadata[:no_encryption]
+
+        puts "  >> Validating post-compaction HFiles for table: #{table_name} 
with " \
+          "CFs: #{metadata[:cfs].join(', ')}"
         metadata[:cfs].each do |cf_name|
           # When using random key from system key, there is no namespace
-          #next if metadata[:expected_namespace][cf_name] == '*'
-          validate_hfile_trailer(table_name, cf_name, true, true, true, 
metadata[:expected_namespace][cf_name])
+          validate_hfile_trailer(table_name, cf_name, true, true, true,
+                                 metadata[:expected_namespace][cf_name])
         end
       end
     end
@@ -559,7 +583,7 @@ module Hbase
     # Utility methods
 
     def extract_and_unwrap_keys_from_tables
-      puts "    >> Extracting and unwrapping keys from encrypted tables"
+      puts '    >> Extracting and unwrapping keys from encrypted tables'
 
       keys = {}
 
@@ -601,9 +625,9 @@ module Hbase
 
       # Use EncryptionUtil.unwrapKey with master key alias as subject
       unwrapped_key = EncryptionUtil.unwrapKey($TEST_CLUSTER.getConfiguration,
-                                              @master_key_alias, 
wrapped_key_bytes)
+                                               @master_key_alias, 
wrapped_key_bytes)
 
-      return unwrapped_key.getEncoded
+      unwrapped_key.getEncoded
     end
 
     def generate_key(alias_name)
@@ -618,7 +642,7 @@ module Hbase
       key_entries.each do |alias_name, key_bytes|
         secret_key = SecretKeySpec.new(key_bytes, 'AES')
         store.setEntry(alias_name, KeyStore::SecretKeyEntry.new(secret_key),
-                      KeyStore::PasswordProtection.new(password_chars))
+                       KeyStore::PasswordProtection.new(password_chars))
       end
 
       fos = FileOutputStream.new(keystore_path)
@@ -629,10 +653,9 @@ module Hbase
       end
     end
 
-
     def teardown
       # Cleanup temporary test directories (keystore files will be cleaned up 
with the directories)
-      test_base_dir = $TEST_CLUSTER.getDataTestDir().toString
+      test_base_dir = $TEST_CLUSTER.getDataTestDir.toString
       Dir.glob(File.join(test_base_dir, "*keystore_#{@test_timestamp}*")).each 
do |dir|
         FileUtils.rm_rf(dir) if File.directory?(dir)
       end
diff --git 
a/hbase-shell/src/test/ruby/shell/rotate_stk_keymeta_mock_provider_test.rb 
b/hbase-shell/src/test/ruby/shell/rotate_stk_keymeta_mock_provider_test.rb
new file mode 100644
index 00000000000..77a2a339552
--- /dev/null
+++ b/hbase-shell/src/test/ruby/shell/rotate_stk_keymeta_mock_provider_test.rb
@@ -0,0 +1,59 @@
+# frozen_string_literal: true
+
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+require 'hbase_shell'
+require 'stringio'
+require 'hbase_constants'
+require 'hbase/hbase'
+require 'hbase/table'
+
+java_import org.apache.hadoop.hbase.io.crypto.Encryption
+java_import org.apache.hadoop.hbase.io.crypto.MockManagedKeyProvider
+module Hbase
+  # Test class for rotate_stk command
+  class RotateSTKKeymetaTest < Test::Unit::TestCase
+    include TestHelpers
+
+    def setup
+      setup_hbase
+    end
+
+    define_test 'Test rotate_stk command' do
+      puts 'Testing rotate_stk command'
+
+      # this should return false (no rotation performed)
+      output = capture_stdout { @shell.command(:rotate_stk) }
+      puts "rotate_stk output: #{output}"
+      assert(output.include?('No System Key change was detected'),
+             "Expected output to contain rotation status message, but got: 
#{output}")
+
+      key_provider = 
Encryption.getManagedKeyProvider($TEST_CLUSTER.getConfiguration)
+      # Once we enable multikeyGenMode on MockManagedKeyProvider, every call 
should return a new key
+      # which should trigger a rotation.
+      key_provider.setMultikeyGenMode(true)
+      output = capture_stdout { @shell.command(:rotate_stk) }
+      puts "rotate_stk output: #{output}"
+      assert(output.include?('System Key rotation was performed successfully 
and cache was ' \
+                             'refreshed on all region servers'),
+             "Expected output to contain rotation status message, but got: 
#{output}")
+    end
+  end
+end
diff --git 
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
index 3d5a7e502e0..daa67297e25 100644
--- 
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
+++ 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
@@ -1376,4 +1376,10 @@ public class ThriftAdmin implements Admin {
     throw new NotImplementedException(
       "isReplicationPeerModificationEnabled not supported in ThriftAdmin");
   }
+
+  @Override
+  public void refreshSystemKeyCacheOnAllServers(Set<ServerName> regionServers) 
throws IOException {
+    throw new NotImplementedException(
+      "refreshSystemKeyCacheOnAllServers not supported in ThriftAdmin");
+  }
 }


Reply via email to