This is an automated email from the ASF dual-hosted git repository.
jojochuang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 62729b06d70 HDFS-17899. Handle InvalidEncryptionKeyException in
Balancer Dispatcher, SPS BlockDispatcher and DataNode DataTransfer (#8383)
62729b06d70 is described below
commit 62729b06d7062e6739f8383af942ecdc0d3b1abb
Author: Zhenyu Li <[email protected]>
AuthorDate: Thu May 14 14:48:37 2026 -0400
HDFS-17899. Handle InvalidEncryptionKeyException in Balancer Dispatcher,
SPS BlockDispatcher and DataNode DataTransfer (#8383)
---
.../sasl/DataEncryptionKeyFactory.java | 10 ++
.../hadoop/hdfs/server/balancer/Dispatcher.java | 73 +++++++---
.../hadoop/hdfs/server/balancer/KeyManager.java | 18 +++
.../hdfs/server/common/sps/BlockDispatcher.java | 61 +++++---
.../hadoop/hdfs/server/datanode/DataNode.java | 51 +++++--
.../balancer/TestDispatcherEncryptionKey.java | 97 +++++++++++++
.../hdfs/server/balancer/TestKeyManager.java | 108 ++++++++++++++-
.../server/common/sps/TestBlockDispatcher.java | 153 +++++++++++++++++++++
.../datanode/TestDataTransferEncryptionKey.java | 75 ++++++++++
9 files changed, 595 insertions(+), 51 deletions(-)
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java
index 959cba0fb48..a1674232fd7 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java
@@ -35,4 +35,14 @@ public interface DataEncryptionKeyFactory {
* @throws IOException for any error
*/
DataEncryptionKey newDataEncryptionKey() throws IOException;
+
+ /**
+ * Clear the cached data encryption key, so that a new key will be
+ * generated on the next call to {@link #newDataEncryptionKey()}.
+ * This is called when an InvalidEncryptionKeyException is received
+ * to force a key refresh on retry.
+ */
+ default void clearDataEncryptionKey() {
+ // no-op by default
+ }
}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index acac65d7745..4fcf17f4a3b 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -63,6 +63,7 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import
org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
@@ -372,29 +373,49 @@ private void dispatch() {
LOG.info("Start moving " + this);
assert !(reportedBlock instanceof DBlockStriped);
- sock.connect(
- NetUtils.createSocketAddr(target.getDatanodeInfo().
- getXferAddr(Dispatcher.this.connectToDnViaHostname)),
- HdfsConstants.READ_TIMEOUT);
-
- // Set read timeout so that it doesn't hang forever against
- // unresponsive nodes. Datanode normally sends IN_PROGRESS response
- // twice within the client read timeout period (every 30 seconds by
- // default). Here, we make it give up after 5 minutes of no response.
- sock.setSoTimeout(HdfsConstants.READ_TIMEOUT * 5);
- sock.setKeepAlive(true);
-
- OutputStream unbufOut = sock.getOutputStream();
- InputStream unbufIn = sock.getInputStream();
ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(),
reportedBlock.getBlock());
- final KeyManager km = nnc.getKeyManager();
- Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb,
- new StorageType[]{target.storageType}, new String[0]);
- IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
- unbufIn, km, accessToken, target.getDatanodeInfo());
- unbufOut = saslStreams.out;
- unbufIn = saslStreams.in;
+ final KeyManager km = nnc.getKeyManager();
+ Token<BlockTokenIdentifier> accessToken = null;
+ OutputStream unbufOut;
+ InputStream unbufIn;
+ int encryptionKeyRetryCount = 0;
+ while (true) {
+ try {
+ accessToken = km.getAccessToken(eb,
+ new StorageType[]{target.storageType}, new String[0]);
+ sock.connect(
+ NetUtils.createSocketAddr(target.getDatanodeInfo().
+ getXferAddr(Dispatcher.this.connectToDnViaHostname)),
+ HdfsConstants.READ_TIMEOUT);
+
+ // Set read timeout so that it doesn't hang forever against
+ // unresponsive nodes. Datanode normally sends IN_PROGRESS
+ // response twice within the client read timeout period (every
+ // 30 seconds by default). Here, we make it give up after 5
+ // minutes of no response.
+ sock.setSoTimeout(HdfsConstants.READ_TIMEOUT * 5);
+ sock.setKeepAlive(true);
+
+ unbufOut = sock.getOutputStream();
+ unbufIn = sock.getInputStream();
+ IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
+ unbufIn, km, accessToken, target.getDatanodeInfo());
+ unbufOut = saslStreams.out;
+ unbufIn = saslStreams.in;
+ break;
+ } catch (InvalidEncryptionKeyException e) {
+ IOUtils.closeSocket(sock);
+ if (!prepareRetryAfterInvalidEncryptionKey(km,
+ ++encryptionKeyRetryCount)) {
+ throw e;
+ }
+ LOG.info("Retrying connection to {} for block {} after "
+ + "InvalidEncryptionKeyException",
+ target.getDatanodeInfo(), reportedBlock.getBlock(), e);
+ sock = new Socket();
+ }
+ }
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
ioFileBufferSize));
in = new DataInputStream(new BufferedInputStream(unbufIn,
@@ -484,6 +505,16 @@ private void reset() {
}
}
+ private static boolean prepareRetryAfterInvalidEncryptionKey(KeyManager km,
+ int retryCount) throws IOException {
+ if (retryCount > 1) {
+ return false;
+ }
+ km.updateBlockKeys();
+ km.clearDataEncryptionKey();
+ return true;
+ }
+
/** A class for keeping track of block locations in the dispatcher. */
public static class DBlock extends MovedBlocks.Locations<StorageGroup> {
public DBlock(Block block) {
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
index 5644ef7d7da..3c0ed9a7eee 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
@@ -148,6 +148,24 @@ public DataEncryptionKey newDataEncryptionKey() {
}
}
+ /**
+ * Clear the cached data encryption key, so that a new key will be generated
+ * on the next call to {@link #newDataEncryptionKey()}.
+ */
+ @Override
+ public synchronized void clearDataEncryptionKey() {
+ LOG.debug("Clearing data encryption key");
+ encryptionKey = null;
+ }
+
+ /** Update block keys from the NameNode. */
+ public void updateBlockKeys() throws IOException {
+ if (isBlockTokenEnabled) {
+ LOG.debug("Updating block keys from NameNode");
+ blockTokenSecretManager.addKeys(namenode.getBlockKeys());
+ }
+ }
+
@Override
public void close() {
shouldRun = false;
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java
index f7756c74851..95e66112f5b 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java
@@ -27,16 +27,19 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.net.InetSocketAddress;
import java.net.Socket;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import
org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
@@ -110,25 +113,42 @@ public BlockMovementStatus moveBlock(BlockMovingInfo
blkMovingInfo,
DataOutputStream out = null;
DataInputStream in = null;
try {
- NetUtils.connect(sock,
- NetUtils.createSocketAddr(
- blkMovingInfo.getTarget().getXferAddr(connectToDnViaHostname)),
- socketTimeout);
- // Set read timeout so that it doesn't hang forever against
- // unresponsive nodes. Datanode normally sends IN_PROGRESS response
- // twice within the client read timeout period (every 30 seconds by
- // default). Here, we make it give up after "socketTimeout * 5" period
- // of no response.
- sock.setSoTimeout(socketTimeout * 5);
- sock.setKeepAlive(true);
- OutputStream unbufOut = sock.getOutputStream();
- InputStream unbufIn = sock.getInputStream();
- LOG.debug("Connecting to datanode {}", blkMovingInfo.getTarget());
+ InetSocketAddress targetAddr = NetUtils.createSocketAddr(
+ blkMovingInfo.getTarget().getXferAddr(connectToDnViaHostname));
+ OutputStream unbufOut;
+ InputStream unbufIn;
+ int encryptionKeyRetryCount = 0;
+ while (true) {
+ try {
+ NetUtils.connect(sock, targetAddr, socketTimeout);
+ // Set read timeout so that it doesn't hang forever against
+ // unresponsive nodes. Datanode normally sends IN_PROGRESS response
+ // twice within the client read timeout period (every 30 seconds by
+ // default). Here, we make it give up after "socketTimeout * 5"
+ // period of no response.
+ sock.setSoTimeout(socketTimeout * 5);
+ sock.setKeepAlive(true);
+ unbufOut = sock.getOutputStream();
+ unbufIn = sock.getInputStream();
+ LOG.debug("Connecting to datanode {}", blkMovingInfo.getTarget());
- IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
- unbufIn, km, accessToken, blkMovingInfo.getTarget());
- unbufOut = saslStreams.out;
- unbufIn = saslStreams.in;
+ IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
+ unbufIn, km, accessToken, blkMovingInfo.getTarget());
+ unbufOut = saslStreams.out;
+ unbufIn = saslStreams.in;
+ break;
+ } catch (InvalidEncryptionKeyException e) {
+ IOUtils.closeSocket(sock);
+ if (++encryptionKeyRetryCount > 1) {
+ throw e;
+ }
+ LOG.info("Retrying connection to {} for block {} after "
+ + "InvalidEncryptionKeyException",
+ blkMovingInfo.getTarget(), blkMovingInfo.getBlock(), e);
+ km.clearDataEncryptionKey();
+ sock = newSocket();
+ }
+ }
out = new DataOutputStream(
new BufferedOutputStream(unbufOut, ioFileBufferSize));
in = new DataInputStream(
@@ -175,4 +195,9 @@ private static void receiveResponse(DataInputStream in)
throws IOException {
String logInfo = "reportedBlock move is failed";
DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
}
+
+ @VisibleForTesting
+ Socket newSocket() {
+ return new Socket();
+ }
}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 7ee9a4b1729..2faa9c79a6e 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -192,6 +192,7 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import
org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
@@ -3063,10 +3064,6 @@ public void run() {
final String dnAddr = targets[0].getXferAddr(connectToDnViaHostname);
InetSocketAddress curTarget = NetUtils.createSocketAddr(dnAddr);
LOG.debug("Connecting to datanode {}", dnAddr);
- sock = newSocket();
- NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
- sock.setTcpNoDelay(dnConf.getDataTransferServerTcpNoDelay());
- sock.setSoTimeout(targets.length * dnConf.socketTimeout);
//
// Header info
@@ -3077,15 +3074,38 @@ public void run() {
long writeTimeout = dnConf.socketWriteTimeout +
HdfsConstants.WRITE_TIMEOUT_EXTENSION *
(targets.length-1);
- OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
- InputStream unbufIn = NetUtils.getInputStream(sock);
DataEncryptionKeyFactory keyFactory =
getDataEncryptionKeyFactoryForBlock(b);
- IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
- unbufIn, keyFactory, accessToken, bpReg);
- unbufOut = saslStreams.out;
- unbufIn = saslStreams.in;
-
+ OutputStream unbufOut;
+ InputStream unbufIn;
+ int encryptionKeyRetryCount = 0;
+ while (true) {
+ try {
+ sock = newSocket();
+ NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
+ sock.setTcpNoDelay(dnConf.getDataTransferServerTcpNoDelay());
+ sock.setSoTimeout(targets.length * dnConf.socketTimeout);
+
+ unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
+ unbufIn = NetUtils.getInputStream(sock);
+ IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
+ unbufIn, keyFactory, accessToken, bpReg);
+ unbufOut = saslStreams.out;
+ unbufIn = saslStreams.in;
+ break;
+ } catch (InvalidEncryptionKeyException e) {
+ IOUtils.closeSocket(sock);
+ sock = null;
+ if (!prepareRetryAfterInvalidEncryptionKey(keyFactory,
+ ++encryptionKeyRetryCount)) {
+ throw e;
+ }
+ LOG.info("Retrying connection to {} for block {} after "
+ + "InvalidEncryptionKeyException",
+ curTarget, b, e);
+ }
+ }
+
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
DFSUtilClient.getSmallBufferSize(getConf())));
in = new DataInputStream(unbufIn);
@@ -3149,6 +3169,15 @@ public String toString() {
}
}
+ private static boolean prepareRetryAfterInvalidEncryptionKey(
+ DataEncryptionKeyFactory keyFactory, int retryCount) {
+ if (retryCount > 1) {
+ return false;
+ }
+ keyFactory.clearDataEncryptionKey();
+ return true;
+ }
+
/***
* Use BlockTokenSecretManager to generate block token for current user.
*/
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestDispatcherEncryptionKey.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestDispatcherEncryptionKey.java
new file mode 100644
index 00000000000..27356405a62
--- /dev/null
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestDispatcherEncryptionKey.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.balancer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/**
+ * Test Dispatcher handling of InvalidEncryptionKeyException.
+ */
+@Timeout(120)
+public class TestDispatcherEncryptionKey {
+
+ /**
+ * Verify that the dispatcher refreshes the block keys and clears the cached
+ * data encryption key before retrying InvalidEncryptionKeyException.
+ */
+ @Test
+ public void testClearEncryptionKeyOnRetry() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ CountingKeyManager km = new CountingKeyManager(conf);
+
+ assertTrue(prepareRetryAfterInvalidEncryptionKey(km, 1));
+ assertEquals(1, km.updateBlockKeysCount);
+ assertEquals(1, km.clearKeyCount);
+
+ assertFalse(prepareRetryAfterInvalidEncryptionKey(km, 2));
+ assertEquals(1, km.updateBlockKeysCount);
+ assertEquals(1, km.clearKeyCount);
+ }
+
+ private static boolean prepareRetryAfterInvalidEncryptionKey(KeyManager km,
+ int retryCount) throws Exception {
+ Method method = Dispatcher.class.getDeclaredMethod(
+ "prepareRetryAfterInvalidEncryptionKey", KeyManager.class, int.class);
+ method.setAccessible(true);
+ return (boolean) method.invoke(null, km, retryCount);
+ }
+
+ private static final class CountingKeyManager extends KeyManager {
+ private int updateBlockKeysCount;
+ private int clearKeyCount;
+
+ private CountingKeyManager(Configuration conf) throws Exception {
+ super("bp-test", createNamenode(), false, conf);
+ }
+
+ @Override
+ public void updateBlockKeys() {
+ updateBlockKeysCount++;
+ }
+
+ @Override
+ public synchronized void clearDataEncryptionKey() {
+ clearKeyCount++;
+ }
+ }
+
+ private static NamenodeProtocol createNamenode() {
+ return (NamenodeProtocol) Proxy.newProxyInstance(
+ NamenodeProtocol.class.getClassLoader(),
+ new Class<?>[]{NamenodeProtocol.class},
+ (proxy, method, args) -> {
+ if ("getBlockKeys".equals(method.getName())) {
+ return ExportedBlockKeys.DUMMY_KEYS;
+ }
+ throw new UnsupportedOperationException(method.getName());
+ });
+ }
+
+}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestKeyManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestKeyManager.java
index 3d60a73510b..9d8cbc93049 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestKeyManager.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestKeyManager.java
@@ -17,11 +17,15 @@
*/
package org.apache.hadoop.hdfs.server.balancer;
+import java.lang.reflect.Proxy;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.test.Whitebox;
import org.apache.hadoop.util.FakeTimer;
@@ -31,6 +35,9 @@
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -82,4 +89,103 @@ public void testNewDataEncryptionKey() throws Exception {
assertTrue(dekAfterExpiration.expiryDate > fakeTimer.now(),
"KeyManager has an expired DataEncryptionKey!");
}
-}
\ No newline at end of file
+
+ @Test
+ public void testClearDataEncryptionKey() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
+ conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+
+ final long keyUpdateInterval = 2 * 1000;
+ final long tokenLifeTime = keyUpdateInterval;
+ final String blockPoolId = "bp-foo";
+ FakeTimer fakeTimer = new FakeTimer();
+ BlockTokenSecretManager btsm = new BlockTokenSecretManager(
+ keyUpdateInterval, tokenLifeTime, 0, 1, blockPoolId, null, false);
+ Whitebox.setInternalState(btsm, "timer", fakeTimer);
+
+ NamenodeProtocol namenode = createNamenode(btsm.exportKeys(), null);
+
+ KeyManager keyManager = new KeyManager(blockPoolId, namenode,
+ true, conf);
+ Whitebox.setInternalState(keyManager, "timer", fakeTimer);
+ Whitebox.setInternalState(
+ Whitebox.getInternalState(keyManager, "blockTokenSecretManager"),
+ "timer", fakeTimer);
+
+ // Get initial encryption key
+ final DataEncryptionKey dek1 = keyManager.newDataEncryptionKey();
+ assertNotNull(dek1, "Encryption key should not be null");
+
+ // Same cached key should be returned when not expired
+ final DataEncryptionKey dek1Again = keyManager.newDataEncryptionKey();
+ assertSame(dek1, dek1Again,
+ "Should return cached key when not expired");
+
+ // Clear the cached encryption key
+ keyManager.clearDataEncryptionKey();
+
+ // After clearing, a new key should be generated
+ final DataEncryptionKey dek2 = keyManager.newDataEncryptionKey();
+ assertNotNull(dek2, "New encryption key should not be null");
+ assertNotSame(dek1, dek2,
+ "Should generate a new key after clearing cached key");
+ assertTrue(dek2.expiryDate > fakeTimer.now(),
+ "New encryption key should not be expired");
+ }
+
+ @Test
+ public void testUpdateBlockKeysThenClearDataEncryptionKey()
+ throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
+ conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+
+ final long keyUpdateInterval = 2 * 1000;
+ final long tokenLifeTime = keyUpdateInterval;
+ final String blockPoolId = "bp-foo";
+ FakeTimer fakeTimer = new FakeTimer();
+ BlockTokenSecretManager btsm = new BlockTokenSecretManager(
+ keyUpdateInterval, tokenLifeTime, 0, 1, blockPoolId, null, false);
+ Whitebox.setInternalState(btsm, "timer", fakeTimer);
+
+ AtomicInteger getBlockKeysCount = new AtomicInteger();
+ NamenodeProtocol namenode =
+ createNamenode(btsm.exportKeys(), getBlockKeysCount);
+
+ KeyManager keyManager = new KeyManager(blockPoolId, namenode,
+ true, conf);
+ Whitebox.setInternalState(keyManager, "timer", fakeTimer);
+ Whitebox.setInternalState(
+ Whitebox.getInternalState(keyManager, "blockTokenSecretManager"),
+ "timer", fakeTimer);
+
+ final DataEncryptionKey dek1 = keyManager.newDataEncryptionKey();
+ assertNotNull(dek1, "Encryption key should not be null");
+
+ keyManager.updateBlockKeys();
+ keyManager.clearDataEncryptionKey();
+
+ final DataEncryptionKey dek2 = keyManager.newDataEncryptionKey();
+ assertNotNull(dek2, "New encryption key should not be null");
+ assertNotSame(dek1, dek2,
+ "Should generate a new key after updating block keys and clearing");
+ assertEquals(2, getBlockKeysCount.get());
+ }
+
+ private static NamenodeProtocol createNamenode(ExportedBlockKeys blockKeys,
+ AtomicInteger getBlockKeysCount) {
+ return (NamenodeProtocol) Proxy.newProxyInstance(
+ NamenodeProtocol.class.getClassLoader(),
+ new Class<?>[]{NamenodeProtocol.class},
+ (proxy, method, args) -> {
+ if ("getBlockKeys".equals(method.getName())) {
+ if (getBlockKeysCount != null) {
+ getBlockKeysCount.incrementAndGet();
+ }
+ return blockKeys;
+ }
+ throw new UnsupportedOperationException(method.getName());
+ });
+ }
+}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/sps/TestBlockDispatcher.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/sps/TestBlockDispatcher.java
new file mode 100644
index 00000000000..ee553de27d6
--- /dev/null
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/sps/TestBlockDispatcher.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common.sps;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.SocketAddress;
+import java.net.Socket;
+import java.nio.channels.SocketChannel;
+
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import
org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.security.token.Token;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/**
+ * Test BlockDispatcher class.
+ */
+@Timeout(60)
+public class TestBlockDispatcher {
+
+ /**
+ * Verify that when InvalidEncryptionKeyException is encountered during
+ * block move, the dispatcher clears the cached data encryption key before
+ * retry.
+ */
+ @Test
+ public void testClearEncryptionKeyOnRetry() throws Exception {
+ DatanodeInfo target =
+ DFSTestUtil.getDatanodeInfo("127.0.0.1", "localhost", 1);
+ DatanodeInfo source =
+ DFSTestUtil.getDatanodeInfo("127.0.0.1", "localhost", 2);
+
+ BlockMovingInfo blkMovingInfo = new BlockMovingInfo(
+ new Block(1, 100, 1001),
+ source, target,
+ StorageType.DISK, StorageType.ARCHIVE);
+
+ InvalidKeySaslClient saslClient = new InvalidKeySaslClient();
+ CountingKeyFactory km = new CountingKeyFactory();
+ ExtendedBlock eb = new ExtendedBlock("bp-1", 1, 100, 1001);
+ Token<BlockTokenIdentifier> accessToken = new Token<>();
+
+ // Use small socketTimeout (100ms) to keep test fast.
+ BlockDispatcher dispatcher = new BlockDispatcher(100, 1024, false) {
+ @Override
+ Socket newSocket() {
+ return new FakeSocket();
+ }
+ };
+
+ assertThrows(InvalidEncryptionKeyException.class,
+ () -> dispatcher.moveBlock(blkMovingInfo, saslClient, eb,
+ new FakeSocket(), km, accessToken));
+
+ assertEquals(1, km.clearCount);
+ assertEquals(2, saslClient.socketSendCount);
+ }
+
+ private static final class InvalidKeySaslClient
+ extends SaslDataTransferClient {
+ private int socketSendCount;
+
+ private InvalidKeySaslClient() {
+ super(null, null, null);
+ }
+
+ @Override
+ public IOStreamPair socketSend(Socket socket, OutputStream underlyingOut,
+ InputStream underlyingIn, DataEncryptionKeyFactory
encryptionKeyFactory,
+ Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
+ throws InvalidEncryptionKeyException {
+ socketSendCount++;
+ throw new InvalidEncryptionKeyException("test: encryption key expired");
+ }
+ }
+
+ private static final class CountingKeyFactory
+ implements DataEncryptionKeyFactory {
+ private int clearCount;
+
+ @Override
+ public DataEncryptionKey newDataEncryptionKey() {
+ return null;
+ }
+
+ @Override
+ public void clearDataEncryptionKey() {
+ clearCount++;
+ }
+ }
+
+ private static final class FakeSocket extends Socket {
+ private final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ private final ByteArrayInputStream in =
+ new ByteArrayInputStream(new byte[0]);
+
+ @Override
+ public void connect(SocketAddress endpoint, int timeout) {
+ }
+
+ @Override
+ public SocketChannel getChannel() {
+ return null;
+ }
+
+ @Override
+ public OutputStream getOutputStream() {
+ return out;
+ }
+
+ @Override
+ public InputStream getInputStream() {
+ return in;
+ }
+
+ @Override
+ public void close() {
+ }
+ }
+}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataTransferEncryptionKey.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataTransferEncryptionKey.java
new file mode 100644
index 00000000000..aeff6ad9776
--- /dev/null
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataTransferEncryptionKey.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.lang.reflect.Method;
+
+import
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/**
+ * Test DataNode.DataTransfer handling of InvalidEncryptionKeyException.
+ */
+@Timeout(60)
+public class TestDataTransferEncryptionKey {
+
+ /**
+ * Verify that DataTransfer clears the cached data encryption key before
+ * retrying InvalidEncryptionKeyException.
+ */
+ @Test
+ public void testClearEncryptionKeyOnRetry() throws Exception {
+ CountingKeyFactory keyFactory = new CountingKeyFactory();
+
+ assertTrue(prepareRetryAfterInvalidEncryptionKey(keyFactory, 1));
+ assertEquals(1, keyFactory.clearCount);
+
+ assertFalse(prepareRetryAfterInvalidEncryptionKey(keyFactory, 2));
+ assertEquals(1, keyFactory.clearCount);
+ }
+
+ private static boolean prepareRetryAfterInvalidEncryptionKey(
+ DataEncryptionKeyFactory keyFactory, int retryCount) throws Exception {
+ Method method = DataNode.class.getDeclaredMethod(
+ "prepareRetryAfterInvalidEncryptionKey",
+ DataEncryptionKeyFactory.class, int.class);
+ method.setAccessible(true);
+ return (boolean) method.invoke(null, keyFactory, retryCount);
+ }
+
+ private static final class CountingKeyFactory
+ implements DataEncryptionKeyFactory {
+ private int clearCount;
+
+ @Override
+ public DataEncryptionKey newDataEncryptionKey() {
+ return null;
+ }
+
+ @Override
+ public void clearDataEncryptionKey() {
+ clearCount++;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]