This is an automated email from the ASF dual-hosted git repository.
ritesh pushed a commit to branch HDDS-10239-container-reconciliation
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to
refs/heads/HDDS-10239-container-reconciliation by this push:
new 9765a6a134 HDDS-11253. Handle corrupted merkle tree files (#7083)
9765a6a134 is described below
commit 9765a6a1347eb88c93f6423b8d6b4d9e6d55c115
Author: Ethan Rose <[email protected]>
AuthorDate: Tue Sep 10 13:37:06 2024 -0400
HDDS-11253. Handle corrupted merkle tree files (#7083)
---
.../java/org/apache/hadoop/hdds/HddsUtils.java | 1 +
.../ContainerCommandResponseBuilders.java | 10 +-
.../hdds/scm/storage/ContainerProtocolCalls.java | 23 +-
.../org/apache/hadoop/ozone/audit/DNAction.java | 2 +-
.../checksum/ContainerChecksumTreeManager.java | 154 +++++++------
.../checksum/DNContainerOperationClient.java | 27 ++-
.../container/common/impl/HddsDispatcher.java | 2 +-
.../ozone/container/keyvalue/KeyValueHandler.java | 27 ++-
.../checksum/ContainerMerkleTreeTestUtils.java | 23 ++
.../checksum/TestContainerChecksumTreeManager.java | 124 ++++++++---
.../container/keyvalue/TestKeyValueHandler.java | 34 +++
.../src/main/proto/DatanodeClientProtocol.proto | 12 +-
.../ozoneimpl/TestOzoneContainerWithTLS.java | 10 +-
.../TestContainerCommandReconciliation.java | 248 ++++++++++++++-------
14 files changed, 496 insertions(+), 201 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index 794b972f15..ff0cef43c9 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -424,6 +424,7 @@ public final class HddsUtils {
case ListContainer:
case ListChunk:
case GetCommittedBlockLength:
+ case GetContainerChecksumInfo:
return true;
case CloseContainer:
case WriteChunk:
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
index f3476b72e3..f0b483900d 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
@@ -371,14 +371,14 @@ public final class ContainerCommandResponseBuilders {
}
public static ContainerCommandResponseProto
getGetContainerMerkleTreeResponse(
- ContainerCommandRequestProto request, ByteString checksumTree) {
+ ContainerCommandRequestProto request, ByteString checksumInfo) {
- ContainerProtos.GetContainerMerkleTreeResponseProto.Builder
containerMerkleTree =
- ContainerProtos.GetContainerMerkleTreeResponseProto.newBuilder()
+ ContainerProtos.GetContainerChecksumInfoResponseProto.Builder
containerMerkleTree =
+ ContainerProtos.GetContainerChecksumInfoResponseProto.newBuilder()
.setContainerID(request.getContainerID())
- .setContainerMerkleTree(checksumTree);
+ .setContainerChecksumInfo(checksumInfo);
return getSuccessResponseBuilder(request)
- .setGetContainerMerkleTree(containerMerkleTree).build();
+ .setGetContainerChecksumInfo(containerMerkleTree).build();
}
private ContainerCommandResponseBuilders() {
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index 2a6ef2dea5..9b44051e06 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -768,15 +768,21 @@ public final class ContainerProtocolCalls {
}
/**
- * Gets the Container merkle tree for a container from a datanode.
+ * Gets the container checksum info for a container from a datanode. This
method does not deserialize the checksum
+ * info.
+ *
* @param client - client that communicates with the container
* @param containerID - Container Id of the container
* @param encodedContainerID - Encoded token if security is enabled
+ *
+ * @throws IOException For errors communicating with the datanode.
+ * @throws StorageContainerException For errors obtaining the checksum info,
including the file being missing or
+ * empty on the datanode, or the datanode not having a replica of the
container.
*/
- public static ContainerProtos.GetContainerMerkleTreeResponseProto
getContainerMerkleTree(
+ public static ContainerProtos.GetContainerChecksumInfoResponseProto
getContainerChecksumInfo(
XceiverClientSpi client, long containerID, String encodedContainerID)
throws IOException {
- ContainerProtos.GetContainerMerkleTreeRequestProto
containerMerkleTreeRequestProto =
- ContainerProtos.GetContainerMerkleTreeRequestProto
+ ContainerProtos.GetContainerChecksumInfoRequestProto
containerChecksumRequestProto =
+ ContainerProtos.GetContainerChecksumInfoRequestProto
.newBuilder()
.setContainerID(containerID)
.build();
@@ -784,10 +790,10 @@ public final class ContainerProtocolCalls {
ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto
.newBuilder()
- .setCmdType(Type.GetContainerMerkleTree)
+ .setCmdType(Type.GetContainerChecksumInfo)
.setContainerID(containerID)
.setDatanodeUuid(id)
- .setGetContainerMerkleTree(containerMerkleTreeRequestProto);
+ .setGetContainerChecksumInfo(containerChecksumRequestProto);
if (encodedContainerID != null) {
builder.setEncodedToken(encodedContainerID);
}
@@ -796,9 +802,8 @@ public final class ContainerProtocolCalls {
builder.setTraceID(traceId);
}
ContainerCommandRequestProto request = builder.build();
- ContainerCommandResponseProto response =
- client.sendCommand(request, getValidatorList());
- return response.getGetContainerMerkleTree();
+ ContainerCommandResponseProto response = client.sendCommand(request,
getValidatorList());
+ return response.getGetContainerChecksumInfo();
}
/**
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
index be594062d0..b4e1687c28 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
@@ -42,7 +42,7 @@ public enum DNAction implements AuditAction {
STREAM_INIT,
FINALIZE_BLOCK,
ECHO,
- GET_CONTAINER_MERKLE_TREE;
+ GET_CONTAINER_CHECKSUM_INFO;
@Override
public String getAction() {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
index 4902878d74..769515d96e 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
@@ -29,18 +29,22 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Optional;
import java.util.Collection;
import java.util.SortedSet;
import java.util.TreeSet;
-import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import com.google.common.util.concurrent.Striped;
import org.apache.hadoop.hdds.utils.SimpleStriped;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs;
/**
@@ -50,17 +54,18 @@ public class ContainerChecksumTreeManager {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerChecksumTreeManager.class);
- // Used to coordinate reads and writes to each container's checksum file.
+ // Used to coordinate writes to each container's checksum file.
// Each container ID is mapped to a stripe.
- private final Striped<ReadWriteLock> fileLock;
+ // The file is atomically renamed into place, so readers do not need
coordination.
+ private final Striped<Lock> fileLock;
private final ContainerMerkleTreeMetrics metrics;
/**
* Creates one instance that should be used to coordinate all container
checksum info within a datanode.
*/
public ContainerChecksumTreeManager(ConfigurationSource conf) {
- fileLock = SimpleStriped.readWriteLock(
-
conf.getObject(DatanodeConfiguration.class).getContainerChecksumLockStripes(),
true);
+ fileLock =
SimpleStriped.custom(conf.getObject(DatanodeConfiguration.class).getContainerChecksumLockStripes(),
+ () -> new ReentrantLock(true));
metrics = ContainerMerkleTreeMetrics.create();
}
@@ -75,14 +80,26 @@ public class ContainerChecksumTreeManager {
* Concurrent writes to the same file are coordinated internally.
*/
public void writeContainerDataTree(ContainerData data, ContainerMerkleTree
tree) throws IOException {
- Lock writeLock = getWriteLock(data.getContainerID());
+ long containerID = data.getContainerID();
+ Lock writeLock = getLock(containerID);
writeLock.lock();
try {
- ContainerProtos.ContainerChecksumInfo newChecksumInfo =
read(data).toBuilder()
-
.setContainerMerkleTree(captureLatencyNs(metrics.getCreateMerkleTreeLatencyNS(),
tree::toProto))
- .build();
- write(data, newChecksumInfo);
- LOG.debug("Data merkle tree for container {} updated",
data.getContainerID());
+ ContainerProtos.ContainerChecksumInfo.Builder checksumInfoBuilder = null;
+ try {
+ // If the file is not present, we will create the data for the first
time. This happens under a write lock.
+ checksumInfoBuilder = read(data)
+ .orElse(ContainerProtos.ContainerChecksumInfo.newBuilder());
+ } catch (IOException ex) {
+ LOG.error("Failed to read container checksum tree file for container
{}. Overwriting it with a new instance.",
+ containerID, ex);
+ checksumInfoBuilder =
ContainerProtos.ContainerChecksumInfo.newBuilder();
+ }
+
+ checksumInfoBuilder
+ .setContainerID(containerID)
+
.setContainerMerkleTree(captureLatencyNs(metrics.getCreateMerkleTreeLatencyNS(),
tree::toProto));
+ write(data, checksumInfoBuilder.build());
+ LOG.debug("Data merkle tree for container {} updated", containerID);
} finally {
writeLock.unlock();
}
@@ -94,21 +111,33 @@ public class ContainerChecksumTreeManager {
* Concurrent writes to the same file are coordinated internally.
*/
public void markBlocksAsDeleted(KeyValueContainerData data, Collection<Long>
deletedBlockIDs) throws IOException {
- Lock writeLock = getWriteLock(data.getContainerID());
+ long containerID = data.getContainerID();
+ Lock writeLock = getLock(containerID);
writeLock.lock();
try {
- ContainerProtos.ContainerChecksumInfo.Builder checksumInfoBuilder =
read(data).toBuilder();
+ ContainerProtos.ContainerChecksumInfo.Builder checksumInfoBuilder = null;
+ try {
+ // If the file is not present, we will create the data for the first
time. This happens under a write lock.
+ checksumInfoBuilder = read(data)
+ .orElse(ContainerProtos.ContainerChecksumInfo.newBuilder());
+ } catch (IOException ex) {
+ LOG.error("Failed to read container checksum tree file for container
{}. Overwriting it with a new instance.",
+ data.getContainerID(), ex);
+ checksumInfoBuilder =
ContainerProtos.ContainerChecksumInfo.newBuilder();
+ }
+
// Although the persisted block list should already be sorted, we will
sort it here to make sure.
// This will automatically fix any bugs in the persisted order that may
show up.
SortedSet<Long> sortedDeletedBlockIDs = new
TreeSet<>(checksumInfoBuilder.getDeletedBlocksList());
sortedDeletedBlockIDs.addAll(deletedBlockIDs);
checksumInfoBuilder
+ .setContainerID(containerID)
.clearDeletedBlocks()
- .addAllDeletedBlocks(sortedDeletedBlockIDs)
- .build();
+ .addAllDeletedBlocks(sortedDeletedBlockIDs);
write(data, checksumInfoBuilder.build());
- LOG.debug("Deleted block list for container {} updated",
data.getContainerID());
+ LOG.debug("Deleted block list for container {} updated with {} new
blocks", data.getContainerID(),
+ sortedDeletedBlockIDs.size());
} finally {
writeLock.unlock();
}
@@ -125,83 +154,80 @@ public class ContainerChecksumTreeManager {
/**
* Returns the container checksum tree file for the specified container
without deserializing it.
*/
+ @VisibleForTesting
public static File getContainerChecksumFile(ContainerData data) {
return new File(data.getMetadataPath(), data.getContainerID() + ".tree");
}
- private Lock getReadLock(long containerID) {
- return fileLock.get(containerID).readLock();
+ @VisibleForTesting
+ public static File getTmpContainerChecksumFile(ContainerData data) {
+ return new File(data.getMetadataPath(), data.getContainerID() +
".tree.tmp");
}
- private Lock getWriteLock(long containerID) {
- return fileLock.get(containerID).writeLock();
+ private Lock getLock(long containerID) {
+ return fileLock.get(containerID);
}
- private ContainerProtos.ContainerChecksumInfo read(ContainerData data)
throws IOException {
+ /**
+ * Callers are not required to hold a lock while calling this since writes
are done to a tmp file and atomically
+ * swapped into place.
+ */
+ private Optional<ContainerProtos.ContainerChecksumInfo.Builder>
read(ContainerData data) throws IOException {
long containerID = data.getContainerID();
- Lock readLock = getReadLock(containerID);
- readLock.lock();
+ File checksumFile = getContainerChecksumFile(data);
try {
- File checksumFile = getContainerChecksumFile(data);
- // If the checksum file has not been created yet, return an empty
instance.
- // Since all writes happen as part of an atomic read-modify-write cycle
that requires a write lock, two empty
- // instances for the same container obtained only under the read lock
will not conflict.
if (!checksumFile.exists()) {
- LOG.debug("No checksum file currently exists for container {} at the
path {}. Returning an empty instance.",
- containerID, checksumFile);
- return ContainerProtos.ContainerChecksumInfo.newBuilder()
- .setContainerID(containerID)
- .build();
+ LOG.debug("No checksum file currently exists for container {} at the
path {}", containerID, checksumFile);
+ return Optional.empty();
}
try (FileInputStream inStream = new FileInputStream(checksumFile)) {
return captureLatencyNs(metrics.getReadContainerMerkleTreeLatencyNS(),
- () -> ContainerProtos.ContainerChecksumInfo.parseFrom(inStream));
+ () ->
Optional.of(ContainerProtos.ContainerChecksumInfo.parseFrom(inStream).toBuilder()));
}
} catch (IOException ex) {
metrics.incrementMerkleTreeReadFailures();
throw new IOException("Error occurred when reading container merkle tree
for containerID "
- + data.getContainerID(), ex);
- } finally {
- readLock.unlock();
+ + data.getContainerID() + " at path " + checksumFile, ex);
}
}
+ /**
+ * Callers should have acquired the write lock before calling this method.
+ */
private void write(ContainerData data, ContainerProtos.ContainerChecksumInfo
checksumInfo) throws IOException {
- Lock writeLock = getWriteLock(data.getContainerID());
- writeLock.lock();
- try (FileOutputStream outStream = new
FileOutputStream(getContainerChecksumFile(data))) {
- captureLatencyNs(metrics.getWriteContainerMerkleTreeLatencyNS(),
- () -> checksumInfo.writeTo(outStream));
+ // Make sure callers filled in required fields before writing.
+ Preconditions.assertTrue(checksumInfo.hasContainerID());
+
+ File checksumFile = getContainerChecksumFile(data);
+ File tmpChecksumFile = getTmpContainerChecksumFile(data);
+
+ try (FileOutputStream tmpOutputStream = new
FileOutputStream(tmpChecksumFile)) {
+ // Write to a tmp file and rename it into place.
+ captureLatencyNs(metrics.getWriteContainerMerkleTreeLatencyNS(), () -> {
+ checksumInfo.writeTo(tmpOutputStream);
+ Files.move(tmpChecksumFile.toPath(), checksumFile.toPath(),
ATOMIC_MOVE);
+ });
} catch (IOException ex) {
+ // If the move failed and left behind the tmp file, the tmp file will be
overwritten on the next successful write.
+ // Nothing reads directly from the tmp file.
metrics.incrementMerkleTreeWriteFailures();
throw new IOException("Error occurred when writing container merkle tree
for containerID "
+ data.getContainerID(), ex);
- } finally {
- writeLock.unlock();
}
}
- public ByteString getContainerChecksumInfo(KeyValueContainerData data)
- throws IOException {
- long containerID = data.getContainerID();
- Lock readLock = getReadLock(containerID);
- readLock.lock();
- try {
- File checksumFile = getContainerChecksumFile(data);
-
- try (FileInputStream inStream = new FileInputStream(checksumFile)) {
- return ByteString.readFrom(inStream);
- } catch (FileNotFoundException ex) {
- // TODO: Build the container checksum tree when it doesn't exist.
- LOG.debug("No checksum file currently exists for container {} at the
path {}. Returning an empty instance.",
- containerID, checksumFile, ex);
- } catch (IOException ex) {
- throw new IOException("Error occured when reading checksum file for
container " + containerID +
- " at the path " + checksumFile, ex);
- }
- return ByteString.EMPTY;
- } finally {
- readLock.unlock();
+ /**
+ * Reads the container checksum info file from the disk as bytes.
+ * Callers are not required to hold a lock while calling this since writes
are done to a tmp file and atomically
+ * swapped into place.
+ *
+ * @throws FileNotFoundException When the file does not exist. It may not
have been generated yet for this container.
+ * @throws IOException On error reading the file.
+ */
+ public ByteString getContainerChecksumInfo(KeyValueContainerData data)
throws IOException {
+ File checksumFile = getContainerChecksumFile(data);
+ try (FileInputStream inStream = new FileInputStream(checksumFile)) {
+ return ByteString.readFrom(inStream);
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/DNContainerOperationClient.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/DNContainerOperationClient.java
index bdf75763e0..969add4a15 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/DNContainerOperationClient.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/DNContainerOperationClient.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
import org.apache.hadoop.hdds.scm.container.ContainerID;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
@@ -40,6 +41,7 @@ import jakarta.annotation.Nonnull;
import org.apache.hadoop.ozone.container.common.helpers.TokenHelper;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,16 +91,33 @@ public class DNContainerOperationClient implements
AutoCloseable {
return xceiverClientManager;
}
- public ByteString getContainerMerkleTree(long containerId, DatanodeDetails
dn)
+ /**
+ * Reads {@link ContainerProtos.ContainerChecksumInfo} for a specified
container for the specified datanode.
+ *
+ * @throws IOException For errors communicating with the datanode.
+ * @throws StorageContainerException For errors obtaining the checksum info,
including the file being missing or
+ * empty on the datanode, or the datanode not having a replica of the
container.
+ * @throws InvalidProtocolBufferException If the file received from the
datanode cannot be deserialized.
+ */
+ public ContainerProtos.ContainerChecksumInfo getContainerChecksumInfo(long
containerId, DatanodeDetails dn)
throws IOException {
XceiverClientSpi xceiverClient =
this.xceiverClientManager.acquireClient(createSingleNodePipeline(dn));
try {
String containerToken = encode(tokenHelper.getContainerToken(
ContainerID.valueOf(containerId)));
- ContainerProtos.GetContainerMerkleTreeResponseProto response =
- ContainerProtocolCalls.getContainerMerkleTree(xceiverClient,
+ ContainerProtos.GetContainerChecksumInfoResponseProto response =
+ ContainerProtocolCalls.getContainerChecksumInfo(xceiverClient,
containerId, containerToken);
- return response.getContainerMerkleTree();
+ ByteString serializedChecksumInfo = response.getContainerChecksumInfo();
+ // Protobuf will convert an empty ByteString into a default value
object. Treat this as an error instead, since
+ // the default value will not represent the state of the container.
+ // The server does not deserialize the file before sending it, so we
must check the length on the client.
+ if (serializedChecksumInfo.isEmpty()) {
+ throw new StorageContainerException("Empty Container checksum file for
container " + containerId + " received",
+ ContainerProtos.Result.IO_EXCEPTION);
+ } else {
+ return
ContainerProtos.ContainerChecksumInfo.parseFrom(serializedChecksumInfo);
+ }
} finally {
this.xceiverClientManager.releaseClient(xceiverClient, false);
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 40e5bfaba6..21fa76a7be 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -832,7 +832,7 @@ public class HddsDispatcher implements ContainerDispatcher,
Auditor {
case StreamInit : return DNAction.STREAM_INIT;
case FinalizeBlock : return DNAction.FINALIZE_BLOCK;
case Echo : return DNAction.ECHO;
- case GetContainerMerkleTree : return DNAction.GET_CONTAINER_MERKLE_TREE;
+ case GetContainerChecksumInfo: return DNAction.GET_CONTAINER_CHECKSUM_INFO;
default :
LOG.debug("Invalid command type - {}", cmdType);
return null;
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 2559d29da6..00d00caa18 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -99,6 +99,10 @@ import
org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.QUASI_CLOSED;
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CLOSED_CONTAINER_IO;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR;
@@ -109,6 +113,7 @@ import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Res
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.INVALID_CONTAINER_STATE;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.IO_EXCEPTION;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR;
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNCLOSED_CONTAINER_IO;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockDataResponse;
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockLengthResponse;
@@ -298,8 +303,8 @@ public class KeyValueHandler extends Handler {
return handler.handleFinalizeBlock(request, kvContainer);
case Echo:
return handler.handleEcho(request, kvContainer);
- case GetContainerMerkleTree:
- return handler.handleGetContainerMerkleTree(request, kvContainer);
+ case GetContainerChecksumInfo:
+ return handler.handleGetContainerChecksumInfo(request, kvContainer);
default:
return null;
}
@@ -671,10 +676,10 @@ public class KeyValueHandler extends Handler {
return getEchoResponse(request);
}
- ContainerCommandResponseProto handleGetContainerMerkleTree(
+ ContainerCommandResponseProto handleGetContainerChecksumInfo(
ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
- if (!request.hasGetContainerMerkleTree()) {
+ if (!request.hasGetContainerChecksumInfo()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Malformed Read Container Merkle tree request. trace ID: {}",
request.getTraceID());
@@ -682,11 +687,25 @@ public class KeyValueHandler extends Handler {
return malformedRequest(request);
}
+ // A container must have moved past the CLOSING
KeyValueContainerData containerData = kvContainer.getContainerData();
+ State state = containerData.getState();
+ boolean stateSupportsChecksumInfo = (state == CLOSED || state ==
QUASI_CLOSED || state == UNHEALTHY);
+ if (!stateSupportsChecksumInfo) {
+ return ContainerCommandResponseProto.newBuilder()
+ .setCmdType(request.getCmdType())
+ .setTraceID(request.getTraceID())
+ .setResult(UNCLOSED_CONTAINER_IO)
+ .setMessage("Checksum information is not available for containers in
state " + state)
+ .build();
+ }
+
ByteString checksumTree = null;
try {
checksumTree = checksumManager.getContainerChecksumInfo(containerData);
} catch (IOException ex) {
+ // For file not found or other inability to read the file, return an
error to the client.
+ LOG.error("Error occurred when reading checksum file for container {}",
containerData.getContainerID(), ex);
return ContainerCommandResponseProto.newBuilder()
.setCmdType(request.getCmdType())
.setTraceID(request.getTraceID())
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java
index 2b32bb2fe9..0301304db7 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java
@@ -123,6 +123,29 @@ public final class ContainerMerkleTreeTestUtils {
}
}
+ /**
+ * Builds a {@link ContainerMerkleTree} representing arbitrary data. This
can be used to test that the same
+ * structure is preserved throughout serialization, deserialization, and API
calls.
+ */
+ public static ContainerMerkleTree buildTestTree(ConfigurationSource conf) {
+ final long blockID1 = 1;
+ final long blockID2 = 2;
+ final long blockID3 = 3;
+ ContainerProtos.ChunkInfo b1c1 = buildChunk(conf, 0, ByteBuffer.wrap(new
byte[]{1, 2, 3}));
+ ContainerProtos.ChunkInfo b1c2 = buildChunk(conf, 1, ByteBuffer.wrap(new
byte[]{4, 5, 6}));
+ ContainerProtos.ChunkInfo b2c1 = buildChunk(conf, 0, ByteBuffer.wrap(new
byte[]{7, 8, 9}));
+ ContainerProtos.ChunkInfo b2c2 = buildChunk(conf, 1, ByteBuffer.wrap(new
byte[]{12, 11, 10}));
+ ContainerProtos.ChunkInfo b3c1 = buildChunk(conf, 0, ByteBuffer.wrap(new
byte[]{13, 14, 15}));
+ ContainerProtos.ChunkInfo b3c2 = buildChunk(conf, 1, ByteBuffer.wrap(new
byte[]{16, 17, 18}));
+
+ ContainerMerkleTree tree = new ContainerMerkleTree();
+ tree.addChunks(blockID1, Arrays.asList(b1c1, b1c2));
+ tree.addChunks(blockID2, Arrays.asList(b2c1, b2c2));
+ tree.addChunks(blockID3, Arrays.asList(b3c1, b3c2));
+
+ return tree;
+ }
+
/**
* This function checks whether the container checksum file exists.
*/
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java
index 6b5fe7bbe1..b482d746ef 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java
@@ -23,24 +23,33 @@ import
org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
-import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.assertTreesSortedAndMatch;
-import static
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.buildChunk;
+import static
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.buildTestTree;
import static
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.readChecksumFile;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
class TestContainerChecksumTreeManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestContainerChecksumTreeManager.class);
+
private static final long CONTAINER_ID = 1L;
@TempDir
private File testDir;
@@ -97,7 +106,7 @@ class TestContainerChecksumTreeManager {
public void testWriteOnlyTreeToFile() throws Exception {
assertEquals(metrics.getWriteContainerMerkleTreeLatencyNS().lastStat().total(),
0);
assertEquals(metrics.getCreateMerkleTreeLatencyNS().lastStat().total(), 0);
- ContainerMerkleTree tree = buildTestTree();
+ ContainerMerkleTree tree = buildTestTree(config);
checksumManager.writeContainerDataTree(container, tree);
assertTrue(metrics.getWriteContainerMerkleTreeLatencyNS().lastStat().total() >
0);
ContainerProtos.ContainerChecksumInfo checksumInfo =
readChecksumFile(container);
@@ -160,7 +169,7 @@ class TestContainerChecksumTreeManager {
List<Long> expectedBlocksToDelete = Arrays.asList(1L, 2L, 3L);
checksumManager.markBlocksAsDeleted(container, new
ArrayList<>(expectedBlocksToDelete));
assertEquals(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total(),
0);
- ContainerMerkleTree tree = buildTestTree();
+ ContainerMerkleTree tree = buildTestTree(config);
checksumManager.writeContainerDataTree(container, tree);
assertTrue(metrics.getWriteContainerMerkleTreeLatencyNS().lastStat().total() >
0);
assertTrue(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total() >
0);
@@ -178,7 +187,7 @@ class TestContainerChecksumTreeManager {
assertEquals(metrics.getWriteContainerMerkleTreeLatencyNS().lastStat().total(),
0);
assertEquals(metrics.getCreateMerkleTreeLatencyNS().lastStat().total(), 0);
assertEquals(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total(),
0);
- ContainerMerkleTree tree = buildTestTree();
+ ContainerMerkleTree tree = buildTestTree(config);
checksumManager.writeContainerDataTree(container, tree);
assertEquals(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total(),
0);
List<Long> expectedBlocksToDelete = Arrays.asList(1L, 2L, 3L);
@@ -198,7 +207,7 @@ class TestContainerChecksumTreeManager {
public void testReadContainerMerkleTreeMetric() throws Exception {
assertEquals(metrics.getWriteContainerMerkleTreeLatencyNS().lastStat().total(),
0);
assertEquals(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total(),
0);
- ContainerMerkleTree tree = buildTestTree();
+ ContainerMerkleTree tree = buildTestTree(config);
checksumManager.writeContainerDataTree(container, tree);
assertEquals(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total(),
0);
checksumManager.writeContainerDataTree(container, tree);
@@ -206,28 +215,93 @@ class TestContainerChecksumTreeManager {
assertTrue(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total() >
0);
}
+ /**
+ * Updates to the container checksum file are written to a tmp file and then
swapped in to place. Test that when
+ * the write to the tmp file fails, the main file that is read from is left
intact.
+ */
+ @Test
+ public void testTmpFileWriteFailure() throws Exception {
+ File tmpFile =
ContainerChecksumTreeManager.getTmpContainerChecksumFile(container);
+ File finalFile =
ContainerChecksumTreeManager.getContainerChecksumFile(container);
+
+ assertFalse(tmpFile.exists());
+ assertFalse(finalFile.exists());
+ ContainerMerkleTree tree = buildTestTree(config);
+ checksumManager.writeContainerDataTree(container, tree);
+ assertFalse(tmpFile.exists());
+ assertTrue(finalFile.exists());
+
+ // Make the write to the tmp file fail by removing permissions on its
parent.
+ assertTrue(tmpFile.getParentFile().setWritable(false));
+ try {
+ checksumManager.writeContainerDataTree(container, tree);
+ fail("Write to the tmp file should have failed.");
+ } catch (IOException ex) {
+ LOG.info("Write to the tmp file failed as expected with the following
exception: ", ex);
+ }
+ assertFalse(tmpFile.exists());
+ // The original file should still remain valid.
+ assertTrue(finalFile.exists());
+ assertTreesSortedAndMatch(tree.toProto(),
readChecksumFile(container).getContainerMerkleTree());
+ }
+
+ @Test
+ public void testCorruptedFile() throws Exception {
+ // Write file
+ File finalFile =
ContainerChecksumTreeManager.getContainerChecksumFile(container);
+ assertFalse(finalFile.exists());
+ ContainerMerkleTree tree = buildTestTree(config);
+ checksumManager.writeContainerDataTree(container, tree);
+ assertTrue(finalFile.exists());
+
+ // Corrupt the file so it is not a valid protobuf.
+ Files.write(finalFile.toPath(), new byte[]{1, 2, 3},
+ StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.SYNC);
+
+ // Direct read should throw to verify the proto is not valid.
+ assertThrows(IOException.class, () -> readChecksumFile(container));
+
+ // The manager's read/modify/write cycle should account for the corruption
and overwrite the entry.
+ // No exception should be thrown.
+ checksumManager.writeContainerDataTree(container, tree);
+ assertTreesSortedAndMatch(tree.toProto(),
readChecksumFile(container).getContainerMerkleTree());
+ }
+
+ /**
+ * An empty file will be interpreted by protobuf to be an object with
default values.
+ * The checksum manager should overwrite this if it is encountered.
+ */
+ @Test
+ public void testEmptyFile() throws Exception {
+ // Write file
+ File finalFile =
ContainerChecksumTreeManager.getContainerChecksumFile(container);
+ assertFalse(finalFile.exists());
+ ContainerMerkleTree tree = buildTestTree(config);
+ checksumManager.writeContainerDataTree(container, tree);
+ assertTrue(finalFile.exists());
+
+ // Truncate the file to zero length.
+ Files.write(finalFile.toPath(), new byte[0],
+ StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.SYNC);
+ assertEquals(0, finalFile.length());
+
+ // The truncated file will be interpreted as an empty protobuf object.
+ // Use a test helper method to read it directly and confirm this.
+ ContainerProtos.ContainerChecksumInfo emptyInfo =
readChecksumFile(container);
+ assertFalse(emptyInfo.hasContainerID());
+ assertFalse(emptyInfo.hasContainerMerkleTree());
+
+ // The manager's read/modify/write cycle should account for the empty file
and overwrite it with a valid entry.
+ // No exception should be thrown.
+ checksumManager.writeContainerDataTree(container, tree);
+ ContainerProtos.ContainerChecksumInfo info = readChecksumFile(container);
+ assertTreesSortedAndMatch(tree.toProto(), info.getContainerMerkleTree());
+ assertEquals(CONTAINER_ID, info.getContainerID());
+ }
+
@Test
public void testChecksumTreeFilePath() {
assertEquals(checksumFile.getAbsolutePath(),
ContainerChecksumTreeManager.getContainerChecksumFile(container).getAbsolutePath());
}
-
- private ContainerMerkleTree buildTestTree() {
- final long blockID1 = 1;
- final long blockID2 = 2;
- final long blockID3 = 3;
- ContainerProtos.ChunkInfo b1c1 = buildChunk(config, 0, ByteBuffer.wrap(new
byte[]{1, 2, 3}));
- ContainerProtos.ChunkInfo b1c2 = buildChunk(config, 1, ByteBuffer.wrap(new
byte[]{4, 5, 6}));
- ContainerProtos.ChunkInfo b2c1 = buildChunk(config, 0, ByteBuffer.wrap(new
byte[]{7, 8, 9}));
- ContainerProtos.ChunkInfo b2c2 = buildChunk(config, 1, ByteBuffer.wrap(new
byte[]{12, 11, 10}));
- ContainerProtos.ChunkInfo b3c1 = buildChunk(config, 0, ByteBuffer.wrap(new
byte[]{13, 14, 15}));
- ContainerProtos.ChunkInfo b3c2 = buildChunk(config, 1, ByteBuffer.wrap(new
byte[]{16, 17, 18}));
-
- ContainerMerkleTree tree = new ContainerMerkleTree();
- tree.addChunks(blockID1, Arrays.asList(b1c1, b1c2));
- tree.addChunks(blockID2, Arrays.asList(b2c1, b2c2));
- tree.addChunks(blockID3, Arrays.asList(b3c1, b3c2));
-
- return tree;
- }
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index e13ba38181..30a8a9bcbc 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -22,9 +22,11 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.EnumSet;
import java.util.List;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
@@ -38,6 +40,7 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.security.token.TokenVerifier;
@@ -62,6 +65,9 @@ import org.apache.ozone.test.GenericTestUtils;
import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
import static
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.QUASI_CLOSED;
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
import static org.apache.hadoop.ozone.OzoneConsts.GB;
import static org.assertj.core.api.Assertions.assertThat;
@@ -485,6 +491,34 @@ public class TestKeyValueHandler {
Assertions.assertEquals(1, icrCount.get());
}
+ @Test
+ public void testGetContainerChecksumInfoOnInvalidContainerStates() {
+ when(handler.handleGetContainerChecksumInfo(any(),
any())).thenCallRealMethod();
+
+ // Only mock what is necessary for the request to fail. This test does not
cover allowed states.
+ KeyValueContainer container = mock(KeyValueContainer.class);
+ KeyValueContainerData containerData = mock(KeyValueContainerData.class);
+ when(container.getContainerData()).thenReturn(containerData);
+
+ ContainerCommandRequestProto request =
mock(ContainerCommandRequestProto.class);
+ when(request.hasGetContainerChecksumInfo()).thenReturn(true);
+
when(request.getCmdType()).thenReturn(ContainerProtos.Type.GetContainerChecksumInfo);
+ when(request.getTraceID()).thenReturn("123");
+
+ Set<State> disallowedStates = EnumSet.allOf(State.class);
+ disallowedStates.removeAll(EnumSet.of(CLOSED, QUASI_CLOSED, UNHEALTHY));
+
+ for (State state: disallowedStates) {
+ when(containerData.getState()).thenReturn(state);
+ ContainerProtos.ContainerCommandResponseProto response =
handler.handleGetContainerChecksumInfo(request,
+ container);
+ assertNotNull(response);
+ assertEquals(ContainerProtos.Result.UNCLOSED_CONTAINER_IO,
response.getResult());
+ assertTrue(response.getMessage().contains(state.toString()), "Response
message did not contain the container " +
+ "state " + state);
+ }
+ }
+
private static ContainerCommandRequestProto createContainerRequest(
String datanodeId, long containerID) {
return ContainerCommandRequestProto.newBuilder()
diff --git
a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index 4533a43403..66af02c83a 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -107,7 +107,7 @@ enum Type {
StreamWrite = 20;
FinalizeBlock = 21;
Echo = 22;
- GetContainerMerkleTree = 23;
+ GetContainerChecksumInfo = 23;
}
@@ -216,7 +216,7 @@ message ContainerCommandRequestProto {
optional uint32 version = 24;
optional FinalizeBlockRequestProto finalizeBlock = 25;
optional EchoRequestProto echo = 26;
- optional GetContainerMerkleTreeRequestProto getContainerMerkleTree = 27;
+ optional GetContainerChecksumInfoRequestProto getContainerChecksumInfo =
27;
}
message ContainerCommandResponseProto {
@@ -248,7 +248,7 @@ message ContainerCommandResponseProto {
optional GetCommittedBlockLengthResponseProto getCommittedBlockLength = 21;
optional FinalizeBlockResponseProto finalizeBlock = 22;
optional EchoResponseProto echo = 23;
- optional GetContainerMerkleTreeResponseProto getContainerMerkleTree = 24;
+ optional GetContainerChecksumInfoResponseProto getContainerChecksumInfo =
24;
}
message ContainerDataProto {
@@ -403,13 +403,13 @@ message EchoResponseProto {
optional bytes payload = 1;
}
-message GetContainerMerkleTreeRequestProto {
+message GetContainerChecksumInfoRequestProto {
optional int64 containerID = 1;
}
-message GetContainerMerkleTreeResponseProto {
+message GetContainerChecksumInfoResponseProto {
optional int64 containerID = 1;
- optional bytes containerMerkleTree = 2;
+ optional bytes containerChecksumInfo = 2;
}
// Chunk Operations
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
index e0bbe32abb..0d0bd3d83e 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
@@ -249,7 +249,7 @@ public class TestOzoneContainerWithTLS {
new DNContainerOperationClient(conf, caClient, keyClient)) {
client = clientManager.acquireClient(pipeline);
long containerId = createAndCloseContainer(client,
containerTokenEnabled);
- dnClient.getContainerMerkleTree(containerId, dn);
+ dnClient.getContainerChecksumInfo(containerId, dn);
} finally {
if (container != null) {
container.stop();
@@ -275,16 +275,16 @@ public class TestOzoneContainerWithTLS {
TokenHelper tokenHelper = new TokenHelper(new SecurityConfig(conf),
keyClient);
String containerToken = encode(tokenHelper.getContainerToken(
ContainerID.valueOf(containerId)));
- ContainerProtos.GetContainerMerkleTreeResponseProto response =
- ContainerProtocolCalls.getContainerMerkleTree(client,
+ ContainerProtos.GetContainerChecksumInfoResponseProto response =
+ ContainerProtocolCalls.getContainerChecksumInfo(client,
containerId, containerToken);
// Getting container merkle tree with valid container token
- assertFalse(response.getContainerMerkleTree().isEmpty());
+ assertFalse(response.getContainerChecksumInfo().isEmpty());
// Getting container merkle tree with invalid container token
XceiverClientSpi finalClient = client;
StorageContainerException exception =
assertThrows(StorageContainerException.class,
- () -> ContainerProtocolCalls.getContainerMerkleTree(
+ () -> ContainerProtocolCalls.getContainerChecksumInfo(
finalClient, containerId, "invalidContainerToken"));
assertEquals(ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED,
exception.getResult());
} finally {
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java
index bc235292d2..64af0148a8 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java
@@ -18,14 +18,14 @@
package org.apache.hadoop.ozone.dn.checksum;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.client.ObjectStore;
@@ -34,41 +34,43 @@ import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.TestHelper;
+import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTree;
import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import
org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration;
import org.apache.ozone.test.GenericTestUtils;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
-import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_DEFAULT;
-import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
+import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
+import static org.apache.hadoop.hdds.client.ReplicationType.RATIS;
import static
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.assertTreesSortedAndMatch;
-import static
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.buildChunk;
+import static
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.buildTestTree;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* This class tests container commands for reconciliation.
*/
public class TestContainerCommandReconciliation {
- private static final Logger LOG = LoggerFactory
- .getLogger(TestContainerCommandReconciliation.class);
private static MiniOzoneCluster cluster;
private static OzoneClient rpcClient;
private static ObjectStore store;
@@ -83,11 +85,10 @@ public class TestContainerCommandReconciliation {
testDir = GenericTestUtils.getTestDir(
TestContainerCommandReconciliation.class.getSimpleName());
conf = new OzoneConfiguration();
- conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1);
conf.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath());
- conf.getBoolean(OZONE_SECURITY_ENABLED_KEY,
- OZONE_SECURITY_ENABLED_DEFAULT);
+ // Disable the container scanner so it does not create merkle tree files
that interfere with this test.
+ conf.getObject(ContainerScannerConfiguration.class).setEnabled(false);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(3)
.build();
@@ -108,42 +109,172 @@ public class TestContainerCommandReconciliation {
}
}
+ /**
+ * Container checksum trees are only generated for non-open containers.
+ * Calling the API on a non-open container should fail.
+ */
@Test
- public void testGetContainerMerkleTree() throws Exception {
- final String volumeName = UUID.randomUUID().toString();
- final String bucketName = UUID.randomUUID().toString();
- final String keyName = UUID.randomUUID().toString();
- byte[] data = "Test content".getBytes(UTF_8);
- store.createVolume(volumeName);
- OzoneVolume volume = store.getVolume(volumeName);
- volume.createBucket(bucketName);
- OzoneBucket bucket = volume.getBucket(bucketName);
- // Write Key
- try (OzoneOutputStream os = bucket.createKey(keyName, data.length)) {
- IOUtils.write(data, os);
+ public void testGetChecksumInfoOpenReplica() throws Exception {
+ long containerID = writeDataAndGetContainer(false);
+ HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0);
+ StorageContainerException ex =
assertThrows(StorageContainerException.class,
+ () -> dnClient.getContainerChecksumInfo(containerID,
targetDN.getDatanodeDetails()));
+ assertEquals(ex.getResult(), ContainerProtos.Result.UNCLOSED_CONTAINER_IO);
+ }
+
+ /**
+ * Tests reading the container checksum info file from a datanode who does
not have a replica for the requested
+ * container.
+ */
+ @Test
+ public void testGetChecksumInfoNonexistentReplica() {
+ HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0);
+
+ // Find a container ID that does not exist in the cluster. For a small
test this should be a good starting
+ // point, but modify it just in case.
+ long badIDCheck = 1_000_000;
+ while (cluster.getStorageContainerManager().getContainerManager()
+ .containerExist(ContainerID.valueOf(badIDCheck))) {
+ badIDCheck++;
}
- // Close container
- ContainerInfo container = cluster.getStorageContainerManager()
- .getContainerManager().getContainers().get(0);
- closeContainer(container);
+ final long nonexistentContainerID = badIDCheck;
+ StorageContainerException ex =
assertThrows(StorageContainerException.class,
+ () -> dnClient.getContainerChecksumInfo(nonexistentContainerID,
targetDN.getDatanodeDetails()));
+ assertEquals(ex.getResult(), ContainerProtos.Result.CONTAINER_NOT_FOUND);
+ }
+
+ /**
+ * Tests reading the container checksum info file from a datanode where the
container exists, but the file has not
+ * yet been created.
+ */
+ @Test
+ public void testGetChecksumInfoNonexistentFile() throws Exception {
+ long containerID = writeDataAndGetContainer(true);
+ // Pick a datanode and remove its checksum file.
+ HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0);
+ Container<?> container = targetDN.getDatanodeStateMachine().getContainer()
+ .getContainerSet().getContainer(containerID);
+ File treeFile =
ContainerChecksumTreeManager.getContainerChecksumFile(container.getContainerData());
+ // Closing the container should have generated the tree file.
+ assertTrue(treeFile.exists());
+ assertTrue(treeFile.delete());
+
+ StorageContainerException ex =
assertThrows(StorageContainerException.class, () ->
+ dnClient.getContainerChecksumInfo(containerID,
targetDN.getDatanodeDetails()));
+ assertEquals(ContainerProtos.Result.IO_EXCEPTION, ex.getResult());
+ assertTrue(ex.getMessage().contains("(No such file or directory"),
ex.getMessage() +
+ " did not contain the expected string");
+ }
+
+ /**
+ * Tests reading the container checksum info file from a datanode where the
datanode fails to read the file from
+ * the disk.
+ */
+ @Test
+ public void testGetChecksumInfoServerIOError() throws Exception {
+ long containerID = writeDataAndGetContainer(true);
+ // Pick a datanode and remove its checksum file.
+ HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0);
+ Container<?> container = targetDN.getDatanodeStateMachine().getContainer()
+ .getContainerSet().getContainer(containerID);
+ File treeFile =
ContainerChecksumTreeManager.getContainerChecksumFile(container.getContainerData());
+ assertTrue(treeFile.exists());
+ // Make the server unable to read the file.
+ assertTrue(treeFile.setReadable(false));
+
+ StorageContainerException ex =
assertThrows(StorageContainerException.class, () ->
+ dnClient.getContainerChecksumInfo(containerID,
targetDN.getDatanodeDetails()));
+ assertEquals(ContainerProtos.Result.IO_EXCEPTION, ex.getResult());
+ }
- //Write Checksum Data
- ContainerMerkleTree tree = buildTestTree();
- writeChecksumFileToDatanode(container, tree);
+ /**
+ * Tests reading the container checksum info file from a datanode where the
file is corrupt.
+ * The datanode does not deserialize the file before sending it, so there
should be no error on the server side
+ * when sending the file. The client should raise an error trying to
deserialize it.
+ */
+ @Test
+ public void testGetCorruptChecksumInfo() throws Exception {
+ long containerID = writeDataAndGetContainer(true);
+
+ // Pick a datanode and corrupt its checksum file.
+ HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0);
+ Container<?> container = targetDN.getDatanodeStateMachine().getContainer()
+ .getContainerSet().getContainer(containerID);
+ File treeFile =
ContainerChecksumTreeManager.getContainerChecksumFile(container.getContainerData());
+ Files.write(treeFile.toPath(), new byte[]{1, 2, 3},
+ StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.SYNC);
+
+ // Reading the file from the replica should fail when the client tries to
deserialize it.
+ assertThrows(InvalidProtocolBufferException.class, () ->
dnClient.getContainerChecksumInfo(containerID,
+ targetDN.getDatanodeDetails()));
+ }
+
+ @Test
+ public void testGetEmptyChecksumInfo() throws Exception {
+ long containerID = writeDataAndGetContainer(true);
- // Verify all the ContainerMerkle Tree matches
+ // Pick a datanode and truncate its checksum file to zero length.
+ HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0);
+ Container<?> container = targetDN.getDatanodeStateMachine().getContainer()
+ .getContainerSet().getContainer(containerID);
+ File treeFile =
ContainerChecksumTreeManager.getContainerChecksumFile(container.getContainerData());
+ // TODO After HDDS-10379 the file will already exist and need to be
overwritten.
+ assertTrue(treeFile.exists());
+ Files.write(treeFile.toPath(), new byte[]{},
+ StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.SYNC);
+ assertEquals(0, treeFile.length());
+
+ // The client will get an empty byte string back. It should raise this as
an error instead of returning a default
+ // protobuf object.
+ StorageContainerException ex =
assertThrows(StorageContainerException.class, () ->
+ dnClient.getContainerChecksumInfo(containerID,
targetDN.getDatanodeDetails()));
+ assertEquals(ContainerProtos.Result.IO_EXCEPTION, ex.getResult());
+ }
+
+ @Test
+ public void testGetChecksumInfoSuccess() throws Exception {
+ long containerID = writeDataAndGetContainer(true);
+ // Overwrite the existing tree with a custom one for testing. We will
check that it is returned properly from the
+ // API.
+ ContainerMerkleTree tree = buildTestTree(conf);
+ writeChecksumFileToDatanodes(containerID, tree);
+
+ // Verify trees match on all replicas.
+ // This test is expecting Ratis 3 data written on a 3 node cluster, so
every node has a replica.
+ assertEquals(3, cluster.getHddsDatanodes().size());
List<DatanodeDetails> datanodeDetails = cluster.getHddsDatanodes().stream()
.map(HddsDatanodeService::getDatanodeDetails).collect(Collectors.toList());
for (DatanodeDetails dn: datanodeDetails) {
- ByteString merkleTree =
dnClient.getContainerMerkleTree(container.getContainerID(), dn);
ContainerProtos.ContainerChecksumInfo containerChecksumInfo =
- ContainerProtos.ContainerChecksumInfo.parseFrom(merkleTree);
+ dnClient.getContainerChecksumInfo(containerID, dn);
assertTreesSortedAndMatch(tree.toProto(),
containerChecksumInfo.getContainerMerkleTree());
}
}
- public static void writeChecksumFileToDatanode(ContainerInfo container,
ContainerMerkleTree tree) throws Exception {
+ private long writeDataAndGetContainer(boolean close) throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ byte[] data = "Test content".getBytes(UTF_8);
+ // Write Key
+ try (OzoneOutputStream os = TestHelper.createKey("testkey", RATIS, THREE,
0, store, volumeName, bucketName)) {
+ IOUtils.write(data, os);
+ }
+
+ long containerID = bucket.getKey("testkey").getOzoneKeyLocations().stream()
+ .findFirst().get().getContainerID();
+ if (close) {
+ TestHelper.waitForContainerClose(cluster, containerID);
+ }
+ return containerID;
+ }
+
+ public static void writeChecksumFileToDatanodes(long containerID,
ContainerMerkleTree tree) throws Exception {
// Write Container Merkle Tree
for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
KeyValueHandler keyValueHandler =
@@ -151,46 +282,9 @@ public class TestContainerCommandReconciliation {
.getHandler(ContainerProtos.ContainerType.KeyValueContainer);
KeyValueContainer keyValueContainer =
(KeyValueContainer)
dn.getDatanodeStateMachine().getContainer().getController()
- .getContainer(container.getContainerID());
+ .getContainer(containerID);
keyValueHandler.getChecksumManager().writeContainerDataTree(
keyValueContainer.getContainerData(), tree);
}
}
-
- public static void closeContainer(ContainerInfo container) throws Exception {
- //Close the container first.
-
cluster.getStorageContainerManager().getClientProtocolServer().closeContainer(container.getContainerID());
- GenericTestUtils.waitFor(() -> checkContainerState(container), 100, 50000);
- }
-
- private static boolean checkContainerState(ContainerInfo container) {
- ContainerInfo containerInfo = null;
- try {
- containerInfo = cluster.getStorageContainerManager()
- .getContainerInfo(container.getContainerID());
- return containerInfo.getState() == HddsProtos.LifeCycleState.CLOSED;
- } catch (IOException e) {
- LOG.error("Error when getting the container info", e);
- }
- return false;
- }
-
- public static ContainerMerkleTree buildTestTree() {
- final long blockID1 = 1;
- final long blockID2 = 2;
- final long blockID3 = 3;
- ContainerProtos.ChunkInfo b1c1 = buildChunk(conf, 0, ByteBuffer.wrap(new
byte[]{1, 2, 3}));
- ContainerProtos.ChunkInfo b1c2 = buildChunk(conf, 1, ByteBuffer.wrap(new
byte[]{4, 5, 6}));
- ContainerProtos.ChunkInfo b2c1 = buildChunk(conf, 0, ByteBuffer.wrap(new
byte[]{7, 8, 9}));
- ContainerProtos.ChunkInfo b2c2 = buildChunk(conf, 1, ByteBuffer.wrap(new
byte[]{12, 11, 10}));
- ContainerProtos.ChunkInfo b3c1 = buildChunk(conf, 0, ByteBuffer.wrap(new
byte[]{13, 14, 15}));
- ContainerProtos.ChunkInfo b3c2 = buildChunk(conf, 1, ByteBuffer.wrap(new
byte[]{16, 17, 18}));
-
- ContainerMerkleTree tree = new ContainerMerkleTree();
- tree.addChunks(blockID1, Arrays.asList(b1c1, b1c2));
- tree.addChunks(blockID2, Arrays.asList(b2c1, b2c2));
- tree.addChunks(blockID3, Arrays.asList(b3c1, b3c2));
-
- return tree;
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]