This is an automated email from the ASF dual-hosted git repository.
ritesh pushed a commit to branch HDDS-10239-container-reconciliation
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to
refs/heads/HDDS-10239-container-reconciliation by this push:
new 930d3f0552 HDDS-10372. SCM and Datanode communication for
reconciliation (#6506)
930d3f0552 is described below
commit 930d3f0552b86d6605625cabe5e8961652d14fa2
Author: Ethan Rose <[email protected]>
AuthorDate: Wed May 29 11:05:42 2024 -0700
HDDS-10372. SCM and Datanode communication for reconciliation (#6506)
---
.../apache/hadoop/hdds/scm/client/ScmClient.java | 8 +
.../hdds/scm/container/ContainerReplicaInfo.java | 26 +-
.../hadoop/hdds/scm/exceptions/SCMException.java | 3 +-
.../protocol/StorageContainerLocationProtocol.java | 8 +
.../container/common/helpers/ContainerUtils.java | 10 +-
.../ozone/container/common/impl/ContainerData.java | 33 ++-
.../container/common/impl/ContainerDataYaml.java | 4 +-
.../ozone/container/common/interfaces/Handler.java | 9 +
.../common/statemachine/DatanodeStateMachine.java | 2 +
.../ReconcileContainerCommandHandler.java | 110 ++++++++
.../states/endpoint/HeartbeatEndpointTask.java | 6 +
.../container/keyvalue/KeyValueContainer.java | 3 +-
.../container/keyvalue/KeyValueContainerCheck.java | 2 +-
.../ozone/container/keyvalue/KeyValueHandler.java | 20 ++
.../keyvalue/helpers/KeyValueContainerUtil.java | 2 +-
.../container/ozoneimpl/ContainerController.java | 11 +
.../container/replication/ContainerImporter.java | 2 +-
.../commands/ReconcileContainerCommand.java | 88 ++++++
.../common/TestKeyValueContainerData.java | 4 +
.../TestSchemaOneBackwardsCompatibility.java | 2 +-
.../common/impl/TestContainerDataYaml.java | 34 ++-
.../common/statemachine/TestStateContext.java | 4 +
.../TestReconcileContainerCommandHandler.java | 203 +++++++++++++
.../states/endpoint/TestHeartbeatEndpointTask.java | 38 +++
.../container/keyvalue/TestKeyValueHandler.java | 37 +++
.../replication/TestContainerImporter.java | 4 +-
...inerLocationProtocolClientSideTranslatorPB.java | 10 +
.../src/main/proto/ScmAdminProtocol.proto | 10 +
.../interface-client/src/main/proto/hdds.proto | 1 +
.../proto/ScmServerDatanodeHeartbeatProtocol.proto | 14 +-
.../src/main/proto/ScmServerProtocol.proto | 1 +
.../container/AbstractContainerReportHandler.java | 1 +
.../hdds/scm/container/ContainerReplica.java | 12 +
.../ReconcileContainerEventHandler.java | 94 ++++++
.../ReconciliationEligibilityHandler.java | 141 +++++++++
.../scm/container/reconciliation/package-info.java | 21 ++
.../apache/hadoop/hdds/scm/events/SCMEvents.java | 7 +
...inerLocationProtocolServerSideTranslatorPB.java | 13 +
.../hdds/scm/server/SCMClientProtocolServer.java | 44 ++-
.../hdds/scm/server/SCMDatanodeProtocolServer.java | 8 +
.../hdds/scm/server/StorageContainerManager.java | 5 +
.../org/apache/hadoop/ozone/audit/SCMAction.java | 3 +-
.../scm/container/TestContainerReportHandler.java | 152 +++++++++-
.../TestIncrementalContainerReportHandler.java | 143 +++++++++-
.../TestReconcileContainerEventHandler.java | 314 +++++++++++++++++++++
.../hdds/scm/cli/ContainerOperationClient.java | 4 +
.../hdds/scm/cli/container/ContainerCommands.java | 3 +-
.../scm/cli/container/ReconcileSubcommand.java | 50 ++++
.../src/main/smoketest/admincli/container.robot | 67 +++--
49 files changed, 1730 insertions(+), 61 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
index 14fb0a40cd..1bfdfd34c4 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
@@ -462,4 +462,12 @@ public interface ScmClient extends Closeable {
String scmId) throws IOException;
String getMetrics(String query) throws IOException;
+
+ /**
+ * Trigger a reconcile command to datanodes for a container ID.
+ *
+ * @param containerID The ID of the container to reconcile.
+ * @throws IOException On error
+ */
+ void reconcileContainer(long containerID) throws IOException;
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaInfo.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaInfo.java
index 5a81f6bb47..a239cbfdba 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaInfo.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaInfo.java
@@ -17,9 +17,14 @@
*/
package org.apache.hadoop.hdds.scm.container;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import java.io.IOException;
import java.util.UUID;
/**
@@ -35,6 +40,8 @@ public final class ContainerReplicaInfo {
private long keyCount;
private long bytesUsed;
private int replicaIndex = -1;
+ @JsonSerialize(using = LongToHexJsonSerializer.class)
+ private long dataChecksum;
public static ContainerReplicaInfo fromProto(
HddsProtos.SCMContainerReplicaProto proto) {
@@ -48,7 +55,8 @@ public final class ContainerReplicaInfo {
.setKeyCount(proto.getKeyCount())
.setBytesUsed(proto.getBytesUsed())
.setReplicaIndex(
- proto.hasReplicaIndex() ? (int)proto.getReplicaIndex() : -1);
+ proto.hasReplicaIndex() ? (int)proto.getReplicaIndex() : -1)
+ .setDataChecksum(proto.getDataChecksum());
return builder.build();
}
@@ -87,6 +95,17 @@ public final class ContainerReplicaInfo {
return replicaIndex;
}
+ public long getDataChecksum() {
+ return dataChecksum;
+ }
+
+ private static class LongToHexJsonSerializer extends JsonSerializer<Long> {
+ @Override
+ public void serialize(Long value, JsonGenerator gen, SerializerProvider
provider) throws IOException {
+ gen.writeString(Long.toHexString(value));
+ }
+ }
+
/**
* Builder for ContainerReplicaInfo class.
*/
@@ -134,6 +153,11 @@ public final class ContainerReplicaInfo {
return this;
}
+ public Builder setDataChecksum(long dataChecksum) {
+ subject.dataChecksum = dataChecksum;
+ return this;
+ }
+
public ContainerReplicaInfo build() {
return subject;
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
index 1cebd3296e..fad6feca0b 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
@@ -139,6 +139,7 @@ public class SCMException extends IOException {
CA_ROTATION_IN_PROGRESS,
CA_ROTATION_IN_POST_PROGRESS,
CONTAINER_ALREADY_CLOSED,
- CONTAINER_ALREADY_CLOSING
+ CONTAINER_ALREADY_CLOSING,
+ UNSUPPORTED_OPERATION
}
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index df8ed02cf7..e53bd73eb0 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -483,4 +483,12 @@ public interface StorageContainerLocationProtocol extends
Closeable {
String scmId) throws IOException;
String getMetrics(String query) throws IOException;
+
+ /**
+ * Trigger a reconcile command to datanodes for the current container ID.
+ *
+ * @param containerID The ID of the container to reconcile.
+ * @throws IOException On error
+ */
+ void reconcileContainer(long containerID) throws IOException;
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
index b89ecff48c..22a7408642 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
@@ -194,21 +194,21 @@ public final class ContainerUtils {
* Verify that the checksum stored in containerData is equal to the
* computed checksum.
*/
- public static void verifyChecksum(ContainerData containerData,
+ public static void verifyContainerFileChecksum(ContainerData containerData,
ConfigurationSource conf) throws IOException {
boolean enabled = conf.getBoolean(
HddsConfigKeys.HDDS_CONTAINER_CHECKSUM_VERIFICATION_ENABLED,
HddsConfigKeys.
HDDS_CONTAINER_CHECKSUM_VERIFICATION_ENABLED_DEFAULT);
if (enabled) {
- String storedChecksum = containerData.getChecksum();
+ String storedChecksum = containerData.getContainerFileChecksum();
Yaml yaml = ContainerDataYaml.getYamlForContainerType(
containerData.getContainerType(),
containerData instanceof KeyValueContainerData &&
((KeyValueContainerData)containerData).getReplicaIndex() > 0);
- containerData.computeAndSetChecksum(yaml);
- String computedChecksum = containerData.getChecksum();
+ containerData.computeAndSetContainerFileChecksum(yaml);
+ String computedChecksum = containerData.getContainerFileChecksum();
if (storedChecksum == null || !storedChecksum.equals(computedChecksum)) {
throw new StorageContainerException("Container checksum error for " +
@@ -225,7 +225,7 @@ public final class ContainerUtils {
* @param containerDataYamlStr ContainerData as a Yaml String
* @return Checksum of the container data
*/
- public static String getChecksum(String containerDataYamlStr)
+ public static String getContainerFileChecksum(String containerDataYamlStr)
throws StorageContainerException {
MessageDigest sha;
try {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
index 3c202ba60a..4e3f2a7d53 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
@@ -99,8 +99,12 @@ public abstract class ContainerData {
private HddsVolume volume;
+ // Checksum of just the container file.
private String checksum;
+ // Checksum of the data within the container.
+ private long dataChecksum;
+
private boolean isEmpty;
private int replicaIndex;
@@ -112,7 +116,7 @@ public abstract class ContainerData {
private transient Optional<Instant> lastDataScanTime = Optional.empty();
public static final Charset CHARSET_ENCODING = StandardCharsets.UTF_8;
- private static final String DUMMY_CHECKSUM = new String(new byte[64],
+ private static final String ZERO_CHECKSUM = new String(new byte[64],
CHARSET_ENCODING);
// Common Fields need to be stored in .container file.
@@ -159,7 +163,8 @@ public abstract class ContainerData {
this.originPipelineId = originPipelineId;
this.originNodeId = originNodeId;
this.isEmpty = false;
- setChecksumTo0ByteArray();
+ this.checksum = ZERO_CHECKSUM;
+ this.dataChecksum = 0;
}
protected ContainerData(ContainerData source) {
@@ -571,15 +576,11 @@ public abstract class ContainerData {
this.blockCount.set(count);
}
- public void setChecksumTo0ByteArray() {
- this.checksum = DUMMY_CHECKSUM;
- }
-
- public void setChecksum(String checkSum) {
+ public void setContainerFileChecksum(String checkSum) {
this.checksum = checkSum;
}
- public String getChecksum() {
+ public String getContainerFileChecksum() {
return this.checksum;
}
@@ -630,21 +631,29 @@ public abstract class ContainerData {
*
* Checksum of ContainerData is calculated by setting the
* {@link ContainerData#checksum} field to a 64-byte array with all 0's -
- * {@link ContainerData#DUMMY_CHECKSUM}. After the checksum is calculated,
+ * {@link ContainerData#ZERO_CHECKSUM}. After the checksum is calculated,
* the checksum field is updated with this value.
*
* @param yaml Yaml for ContainerType to get the ContainerData as Yaml String
* @throws IOException
*/
- public void computeAndSetChecksum(Yaml yaml) throws IOException {
+ public void computeAndSetContainerFileChecksum(Yaml yaml) throws IOException
{
// Set checksum to dummy value - 0 byte array, to calculate the checksum
// of rest of the data.
- setChecksumTo0ByteArray();
+ this.checksum = ZERO_CHECKSUM;
// Dump yaml data into a string to compute its checksum
String containerDataYamlStr = yaml.dump(this);
- this.checksum = ContainerUtils.getChecksum(containerDataYamlStr);
+ this.checksum =
ContainerUtils.getContainerFileChecksum(containerDataYamlStr);
+ }
+
+ public void setDataChecksum(long dataChecksum) {
+ this.dataChecksum = dataChecksum;
+ }
+
+ public long getDataChecksum() {
+ return dataChecksum;
}
/**
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
index a4750b5fae..140a462676 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
@@ -98,7 +98,7 @@ public final class ContainerDataYaml {
// Create Yaml for given container type
Yaml yaml = getYamlForContainerType(containerType, withReplicaIndex);
// Compute Checksum and update ContainerData
- containerData.computeAndSetChecksum(yaml);
+ containerData.computeAndSetContainerFileChecksum(yaml);
// Write the ContainerData with checksum to Yaml file.
out = new FileOutputStream(
@@ -312,7 +312,7 @@ public final class ContainerDataYaml {
kvData.setChunksPath((String) nodes.get(OzoneConsts.CHUNKS_PATH));
Map<String, String> meta = (Map) nodes.get(OzoneConsts.METADATA);
kvData.setMetadata(meta);
- kvData.setChecksum((String) nodes.get(OzoneConsts.CHECKSUM));
+ kvData.setContainerFileChecksum((String)
nodes.get(OzoneConsts.CHECKSUM));
Long timestamp = (Long) nodes.get(OzoneConsts.DATA_SCAN_TIMESTAMP);
kvData.setDataScanTimestamp(timestamp);
String state = (String) nodes.get(OzoneConsts.STATE);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index 2ffb9d30d1..179274f2c0 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -21,8 +21,10 @@ package org.apache.hadoop.ozone.container.common.interfaces;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.List;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
@@ -192,6 +194,13 @@ public abstract class Handler {
public abstract void deleteContainer(Container container, boolean force)
throws IOException;
+ /**
+ * Triggers reconciliation of this container replica's data with its peers.
+ * @param container container to be reconciled.
+ * @param peers The other datanodes with a copy of this container whose data
should be checked.
+ */
+ public abstract void reconcileContainer(Container<?> container,
List<DatanodeDetails> peers) throws IOException;
+
/**
* Deletes the given files associated with a block of the container.
*
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 9677144054..9292dba5fd 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -56,6 +56,7 @@ import
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.Crea
import
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler;
import
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteContainerCommandHandler;
import
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.FinalizeNewLayoutVersionCommandHandler;
+import
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReconcileContainerCommandHandler;
import
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReconstructECContainersCommandHandler;
import
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.RefreshVolumeUsageCommandHandler;
import
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReplicateContainerCommandHandler;
@@ -258,6 +259,7 @@ public class DatanodeStateMachine implements Closeable {
supervisor::nodeStateUpdated))
.addHandler(new FinalizeNewLayoutVersionCommandHandler())
.addHandler(new RefreshVolumeUsageCommandHandler())
+ .addHandler(new ReconcileContainerCommandHandler(threadNamePrefix))
.setConnectionManager(connectionManager)
.setContainer(container)
.setContext(context)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconcileContainerCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconcileContainerCommandHandler.java
new file mode 100644
index 0000000000..9a4110c7df
--- /dev/null
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconcileContainerCommandHandler.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Handles commands from SCM to reconcile a container replica on this datanode
with the replicas on its peers.
+ */
+public class ReconcileContainerCommandHandler implements CommandHandler {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ReconcileContainerCommandHandler.class);
+
+ private final AtomicLong invocationCount;
+ private final AtomicInteger queuedCount;
+ private final ExecutorService executor;
+ private long totalTime;
+
+ public ReconcileContainerCommandHandler(String threadNamePrefix) {
+ invocationCount = new AtomicLong(0);
+ queuedCount = new AtomicInteger(0);
+ // TODO Allow configurable thread pool size with a default value when the
implementation is ready.
+ executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+ .setNameFormat(threadNamePrefix + "ReconcileContainerThread-%d")
+ .build());
+ totalTime = 0;
+ }
+
+ @Override
+ public void handle(SCMCommand command, OzoneContainer container,
StateContext context,
+ SCMConnectionManager connectionManager) {
+ queuedCount.incrementAndGet();
+ CompletableFuture.runAsync(() -> {
+ invocationCount.incrementAndGet();
+ long startTime = Time.monotonicNow();
+ ReconcileContainerCommand reconcileCommand = (ReconcileContainerCommand)
command;
+ LOG.info("Processing reconcile container command for container {} with
peers {}",
+ reconcileCommand.getContainerID(),
reconcileCommand.getPeerDatanodes());
+ try {
+
container.getController().reconcileContainer(reconcileCommand.getContainerID(),
+ reconcileCommand.getPeerDatanodes());
+ } catch (IOException ex) {
+ LOG.error("Failed to reconcile container {}.",
reconcileCommand.getContainerID(), ex);
+ } finally {
+ long endTime = Time.monotonicNow();
+ totalTime += endTime - startTime;
+ }
+ }, executor).whenComplete((v, e) -> queuedCount.decrementAndGet());
+ }
+
+ @Override
+ public SCMCommandProto.Type getCommandType() {
+ return SCMCommandProto.Type.reconcileContainerCommand;
+ }
+
+ @Override
+ public int getInvocationCount() {
+ return (int)invocationCount.get();
+ }
+
+ @Override
+ public long getAverageRunTime() {
+ if (invocationCount.get() > 0) {
+ return totalTime / invocationCount.get();
+ }
+ return 0;
+ }
+
+ @Override
+ public long getTotalRunTime() {
+ return totalTime;
+ }
+
+ @Override
+ public int getQueuedCount() {
+ return queuedCount.get();
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 44f0eae49e..b6ab4748fe 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -55,6 +55,7 @@ import
org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import
org.apache.hadoop.ozone.protocol.commands.FinalizeNewLayoutVersionCommand;
+import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
import
org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
import org.apache.hadoop.ozone.protocol.commands.RefreshVolumeUsageCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
@@ -416,6 +417,11 @@ public class HeartbeatEndpointTask
commandResponseProto.getRefreshVolumeUsageCommandProto());
processCommonCommand(commandResponseProto, refreshVolumeUsageCommand);
break;
+ case reconcileContainerCommand:
+ ReconcileContainerCommand reconcileContainerCommand =
+
ReconcileContainerCommand.getFromProtobuf(commandResponseProto.getReconcileContainerCommandProto());
+ processCommonCommand(commandResponseProto, reconcileContainerCommand);
+ break;
default:
throw new IllegalArgumentException("Unknown response : "
+ commandResponseProto.getCommandType().name());
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index 8388182667..de43a29d9f 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -885,7 +885,8 @@ public class KeyValueContainer implements
Container<KeyValueContainerData> {
.setDeleteTransactionId(containerData.getDeleteTransactionId())
.setBlockCommitSequenceId(containerData.getBlockCommitSequenceId())
.setOriginNodeId(containerData.getOriginNodeId())
- .setIsEmpty(containerData.isEmpty());
+ .setIsEmpty(containerData.isEmpty())
+ .setDataChecksum(containerData.getDataChecksum());
return ciBuilder.build();
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
index 70539111fb..f462510bf7 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
@@ -177,7 +177,7 @@ public class KeyValueContainerCheck {
.checkState(onDiskContainerData != null, "Container File not loaded");
try {
- ContainerUtils.verifyChecksum(onDiskContainerData, checkConfig);
+ ContainerUtils.verifyContainerFileChecksum(onDiskContainerData,
checkConfig);
} catch (IOException ex) {
return
ScanResult.unhealthy(ScanResult.FailureType.CORRUPT_CONTAINER_FILE,
containerFile, ex);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index e575a93de2..2843fe3bcf 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
@@ -56,6 +57,8 @@ import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerExcep
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumByteBuffer;
+import org.apache.hadoop.ozone.common.ChecksumByteBufferFactory;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
@@ -1160,6 +1163,23 @@ public class KeyValueHandler extends Handler {
deleteInternal(container, force);
}
+ @Override
+ public void reconcileContainer(Container<?> container, List<DatanodeDetails>
peers) throws IOException {
+ // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
+ ContainerData data = container.getContainerData();
+ long id = data.getContainerID();
+ ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
+ .putLong(id)
+ .asReadOnlyBuffer();
+ byteBuffer.rewind();
+ ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl();
+ checksumImpl.update(byteBuffer);
+ long dataChecksum = checksumImpl.getValue();
+ LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
+ data.setDataChecksum(dataChecksum);
+ sendICR(container);
+ }
+
/**
* Called by BlockDeletingService to delete all the chunks in a block
* before proceeding to delete the block info from DB.
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
index 90ee356ab5..53def25860 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
@@ -210,7 +210,7 @@ public final class KeyValueContainerUtil {
long containerID = kvContainerData.getContainerID();
// Verify Checksum
- ContainerUtils.verifyChecksum(kvContainerData, config);
+ ContainerUtils.verifyContainerFileChecksum(kvContainerData, config);
if (kvContainerData.getSchemaVersion() == null) {
// If this container has not specified a schema version, it is in the old
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
index feb5805387..feb86f3519 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ozone.container.ozoneimpl;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto
.ContainerProtos.ContainerType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@@ -38,6 +39,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.time.Instant;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -188,6 +190,15 @@ public class ContainerController {
}
}
+ public void reconcileContainer(long containerID, List<DatanodeDetails>
peers) throws IOException {
+ Container<?> container = containerSet.getContainer(containerID);
+ if (container == null) {
+ LOG.warn("Container {} to reconcile not found on this datanode.",
containerID);
+ } else {
+ getHandler(container).reconcileContainer(container, peers);
+ }
+ }
+
/**
* Given a container, returns its handler instance.
*
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
index f20094079c..9e5b5dbdab 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
@@ -122,7 +122,7 @@ public class ContainerImporter {
packer.unpackContainerDescriptor(input);
containerData = getKeyValueContainerData(containerDescriptorYaml);
}
- ContainerUtils.verifyChecksum(containerData, conf);
+ ContainerUtils.verifyContainerFileChecksum(containerData, conf);
containerData.setVolume(targetVolume);
try (FileInputStream input = new FileInputStream(tarFilePath.toFile())) {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconcileContainerCommand.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconcileContainerCommand.java
new file mode 100644
index 0000000000..cdd4522cc6
--- /dev/null
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconcileContainerCommand.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.protocol.commands;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReconcileContainerCommandProto;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyList;
+
+/**
+ * Asks datanodes to reconcile the specified container with other container
replicas.
+ */
+public class ReconcileContainerCommand extends
SCMCommand<ReconcileContainerCommandProto> {
+
+ private final List<DatanodeDetails> peerDatanodes;
+
+ public ReconcileContainerCommand(long containerID, List<DatanodeDetails>
peerDatanodes) {
+ // Container ID serves as command ID, since only one reconciliation should
be in progress at a time.
+ super(containerID);
+ this.peerDatanodes = peerDatanodes;
+ }
+
+
+ @Override
+ public SCMCommandProto.Type getType() {
+ return SCMCommandProto.Type.reconcileContainerCommand;
+ }
+
+ @Override
+ public ReconcileContainerCommandProto getProto() {
+ ReconcileContainerCommandProto.Builder builder =
ReconcileContainerCommandProto.newBuilder()
+ .setContainerID(getId());
+ for (DatanodeDetails dd : peerDatanodes) {
+ builder.addPeers(dd.getProtoBufMessage());
+ }
+ return builder.build();
+ }
+
+ public List<DatanodeDetails> getPeerDatanodes() {
+ return peerDatanodes;
+ }
+
+ public long getContainerID() {
+ return getId();
+ }
+
+ public static ReconcileContainerCommand
getFromProtobuf(ReconcileContainerCommandProto protoMessage) {
+ Preconditions.checkNotNull(protoMessage);
+
+ List<HddsProtos.DatanodeDetailsProto> peers = protoMessage.getPeersList();
+ List<DatanodeDetails> peerNodes = !peers.isEmpty()
+ ? peers.stream()
+ .map(DatanodeDetails::getFromProtoBuf)
+ .collect(Collectors.toList())
+ : emptyList();
+
+ return new ReconcileContainerCommand(protoMessage.getContainerID(),
peerNodes);
+ }
+
+ @Override
+ public String toString() {
+ return getType() +
+ ": containerId=" + getContainerID() +
+ ", peerNodes=" + peerDatanodes;
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java
index b3b0f5b437..4360243f0b 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java
@@ -84,6 +84,7 @@ public class TestKeyValueContainerData {
assertEquals(val.get(), kvData.getBlockCount());
assertEquals(val.get(), kvData.getNumPendingDeletionBlocks());
assertEquals(MAXSIZE, kvData.getMaxSize());
+ assertEquals(0, kvData.getDataChecksum());
kvData.setState(state);
kvData.setContainerDBType(containerDBType);
@@ -98,6 +99,8 @@ public class TestKeyValueContainerData {
kvData.incrPendingDeletionBlocks(1);
kvData.setSchemaVersion(
VersionedDatanodeFeatures.SchemaV3.chooseSchemaVersion(conf));
+ long expectedDataHash = 1234L;
+ kvData.setDataChecksum(expectedDataHash);
assertEquals(state, kvData.getState());
assertEquals(containerDBType, kvData.getContainerDBType());
@@ -114,6 +117,7 @@ public class TestKeyValueContainerData {
assertEquals(datanodeId.toString(), kvData.getOriginNodeId());
assertEquals(VersionedDatanodeFeatures.SchemaV3.chooseSchemaVersion(conf),
kvData.getSchemaVersion());
+ assertEquals(expectedDataHash, kvData.getDataChecksum());
KeyValueContainerData newKvData = new KeyValueContainerData(kvData);
assertEquals(kvData.getReplicaIndex(), newKvData.getReplicaIndex());
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
index 2235b23ce8..ad5ca48218 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
@@ -614,7 +614,7 @@ public class TestSchemaOneBackwardsCompatibility {
Yaml yaml = ContainerDataYaml.getYamlForContainerType(
kvData.getContainerType(),
kvData.getReplicaIndex() > 0);
- kvData.computeAndSetChecksum(yaml);
+ kvData.computeAndSetContainerFileChecksum(yaml);
KeyValueContainerUtil.parseKVContainerData(kvData, conf);
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java
index ec78398824..2057c4400a 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java
@@ -87,6 +87,7 @@ public class TestContainerDataYaml {
keyValueContainerData.setSchemaVersion(
VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion());
keyValueContainerData.setReplicaIndex(replicaIndex);
+ keyValueContainerData.setDataChecksum(12345);
File containerFile = new File(testRoot, containerPath);
@@ -218,7 +219,7 @@ public class TestContainerDataYaml {
File file = new File(classLoader.getResource(containerFile).getFile());
KeyValueContainerData kvData = (KeyValueContainerData) ContainerDataYaml
.readContainerFile(file);
- ContainerUtils.verifyChecksum(kvData, conf);
+ ContainerUtils.verifyContainerFileChecksum(kvData, conf);
//Checking the Container file data is consistent or not
assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED, kvData
@@ -236,7 +237,7 @@ public class TestContainerDataYaml {
}
/**
- * Test to verify {@link
ContainerUtils#verifyChecksum(ContainerData,ConfigurationSource)}.
+ * Test to verify {@link
ContainerUtils#verifyContainerFileChecksum(ContainerData,ConfigurationSource)}.
*/
@ContainerLayoutTestInfo.ContainerTest
public void testChecksumInContainerFile(ContainerLayoutVersion layout)
throws IOException {
@@ -247,13 +248,32 @@ public class TestContainerDataYaml {
// Read from .container file, and verify data.
KeyValueContainerData kvData = (KeyValueContainerData)
ContainerDataYaml.readContainerFile(containerFile);
- ContainerUtils.verifyChecksum(kvData, conf);
+ ContainerUtils.verifyContainerFileChecksum(kvData, conf);
cleanup();
}
/**
- * Test to verify {@link
ContainerUtils#verifyChecksum(ContainerData,ConfigurationSource)}.
+ * The container's data checksum is stored in a separate file with its
Merkle hash tree. It should not be persisted
+ * to the .container file.
+ */
+ @ContainerLayoutTestInfo.ContainerTest
+ public void testDataChecksumNotInContainerFile(ContainerLayoutVersion
layout) throws IOException {
+ setLayoutVersion(layout);
+ long containerID = testContainerID++;
+
+ File containerFile = createContainerFile(containerID, 0);
+
+ // Read from .container file. The kvData object should not have a data
hash because it was not persisted in this
+ // file.
+ KeyValueContainerData kvData = (KeyValueContainerData)
ContainerDataYaml.readContainerFile(containerFile);
+ assertEquals(0, kvData.getDataChecksum());
+
+ cleanup();
+ }
+
+ /**
+ * Test to verify {@link
ContainerUtils#verifyContainerFileChecksum(ContainerData,ConfigurationSource)}.
*/
@ContainerLayoutTestInfo.ContainerTest
public void testChecksumInContainerFileWithReplicaIndex(
@@ -266,7 +286,7 @@ public class TestContainerDataYaml {
// Read from .container file, and verify data.
KeyValueContainerData kvData = (KeyValueContainerData) ContainerDataYaml
.readContainerFile(containerFile);
- ContainerUtils.verifyChecksum(kvData, conf);
+ ContainerUtils.verifyContainerFileChecksum(kvData, conf);
cleanup();
}
@@ -287,7 +307,7 @@ public class TestContainerDataYaml {
setLayoutVersion(layout);
Exception ex = assertThrows(Exception.class, () -> {
KeyValueContainerData kvData = getKeyValueContainerData();
- ContainerUtils.verifyChecksum(kvData, conf);
+ ContainerUtils.verifyContainerFileChecksum(kvData, conf);
});
assertThat(ex).hasMessageStartingWith("Container checksum error for
ContainerID:");
@@ -303,6 +323,6 @@ public class TestContainerDataYaml {
KeyValueContainerData kvData = getKeyValueContainerData();
conf.setBoolean(HddsConfigKeys.
HDDS_CONTAINER_CHECKSUM_VERIFICATION_ENABLED, false);
- ContainerUtils.verifyChecksum(kvData, conf);
+ ContainerUtils.verifyContainerFileChecksum(kvData, conf);
}
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
index 7f2cdcc6e5..d337b3a5f2 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
@@ -65,6 +65,7 @@ import
org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.ozone.test.LambdaTestUtils;
@@ -707,6 +708,7 @@ public class TestStateContext {
ctx.addCommand(ReplicateContainerCommand.forTest(3));
ctx.addCommand(new ClosePipelineCommand(PipelineID.randomId()));
ctx.addCommand(new CloseContainerCommand(1, PipelineID.randomId()));
+ ctx.addCommand(new ReconcileContainerCommand(4, Collections.emptyList()));
Map<SCMCommandProto.Type, Integer> summary = ctx.getCommandQueueSummary();
assertEquals(3,
@@ -715,6 +717,8 @@ public class TestStateContext {
summary.get(SCMCommandProto.Type.closePipelineCommand).intValue());
assertEquals(1,
summary.get(SCMCommandProto.Type.closeContainerCommand).intValue());
+ assertEquals(1,
+
summary.get(SCMCommandProto.Type.reconcileContainerCommand).intValue());
}
@Test
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java
new file mode 100644
index 0000000000..d6be667f41
--- /dev/null
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
+import org.apache.ozone.test.GenericTestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Collections.singletonMap;
+import static
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
+import static org.apache.hadoop.ozone.OzoneConsts.GB;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests Datanode handling of reconcile container commands.
+ */
+public class TestReconcileContainerCommandHandler {
+ public static final Logger LOG =
LoggerFactory.getLogger(TestReconcileContainerCommandHandler.class);
+
+ private static final int NUM_CONTAINERS = 3;
+
+ private ContainerSet containerSet;
+ private OzoneContainer ozoneContainer;
+ private StateContext context;
+ private ReconcileContainerCommandHandler subject;
+
+ public void init(ContainerLayoutVersion layout,
IncrementalReportSender<Container> icrSender)
+ throws Exception {
+
+ OzoneConfiguration conf = new OzoneConfiguration();
+ DatanodeDetails dnDetails = randomDatanodeDetails();
+ subject = new ReconcileContainerCommandHandler("");
+ context = ContainerTestUtils.getMockContext(dnDetails, conf);
+
+ containerSet = new ContainerSet(1000);
+ for (int id = 1; id <= NUM_CONTAINERS; id++) {
+ KeyValueContainerData data = new KeyValueContainerData(id, layout, GB,
+ PipelineID.randomId().toString(),
randomDatanodeDetails().getUuidString());
+ containerSet.addContainer(new KeyValueContainer(data, conf));
+ }
+
+ assertEquals(NUM_CONTAINERS, containerSet.containerCount());
+
+ Handler containerHandler = new KeyValueHandler(new OzoneConfiguration(),
dnDetails.getUuidString(), containerSet,
+ mock(VolumeSet.class), mock(ContainerMetrics.class), icrSender);
+ ContainerController controller = new ContainerController(containerSet,
+ singletonMap(ContainerProtos.ContainerType.KeyValueContainer,
containerHandler));
+ ozoneContainer = mock(OzoneContainer.class);
+ when(ozoneContainer.getController()).thenReturn(controller);
+ when(ozoneContainer.getContainerSet()).thenReturn(containerSet);
+ }
+
+ @ContainerLayoutTestInfo.ContainerTest
+ public void testReconcileContainerCommandReports(ContainerLayoutVersion
layout) throws Exception {
+ Map<ContainerID, ContainerReplicaProto> containerReportsSent = new
HashMap<>();
+ IncrementalReportSender<Container> icrSender = c -> {
+ try {
+ ContainerID id =
ContainerID.valueOf(c.getContainerData().getContainerID());
+ containerReportsSent.put(id, c.getContainerReport());
+ LOG.info("Added container report for container {}", id);
+ } catch (Exception ex) {
+ LOG.error("ICR sender failed", ex);
+ }
+ };
+ init(layout, icrSender);
+
+ for (int id = 1; id <= NUM_CONTAINERS; id++) {
+ ReconcileContainerCommand cmd = new ReconcileContainerCommand(id,
Collections.emptyList());
+ subject.handle(cmd, ozoneContainer, context, null);
+ }
+
+ // An unknown container should not trigger a container report being sent.
+ ReconcileContainerCommand unknownContainerCmd = new
ReconcileContainerCommand(NUM_CONTAINERS + 1,
+ Collections.emptyList());
+ subject.handle(unknownContainerCmd, ozoneContainer, context, null);
+
+ waitForAllCommandsToFinish();
+ verifyAllContainerReports(containerReportsSent);
+ }
+
+ @ContainerLayoutTestInfo.ContainerTest
+ public void testReconcileContainerCommandMetrics(ContainerLayoutVersion
layout) throws Exception {
+ // Used to block ICR sending so that queue metrics can be checked before
the reconcile task completes.
+ CountDownLatch icrLatch = new CountDownLatch(1);
+ // Wait this long before completing the task.
+ // This provides a lower bound on execution time.
+ final long minExecTimeMillis = 500;
+ // This is the lower bound on execution time of all the commands combined.
+ final long expectedTotalMinExecTimeMillis = minExecTimeMillis *
NUM_CONTAINERS;
+
+ IncrementalReportSender<Container> icrSender = c -> {
+ try {
+ // Block the caller until the latch is counted down.
+ // Caller can check queue metrics in the meantime.
+ LOG.info("ICR sender waiting for latch");
+ assertTrue(icrLatch.await(30, TimeUnit.SECONDS));
+ LOG.info("ICR sender proceeding after latch");
+
+ Thread.sleep(minExecTimeMillis);
+ } catch (Exception ex) {
+ LOG.error("ICR sender failed", ex);
+ }
+ };
+
+ init(layout, icrSender);
+
+ // All commands submitted will be blocked until the latch is counted down.
+ for (int id = 1; id <= NUM_CONTAINERS; id++) {
+ ReconcileContainerCommand cmd = new ReconcileContainerCommand(id,
Collections.emptyList());
+ subject.handle(cmd, ozoneContainer, context, null);
+ }
+ assertEquals(NUM_CONTAINERS, subject.getQueuedCount());
+ assertEquals(0, subject.getTotalRunTime());
+ assertEquals(0, subject.getAverageRunTime());
+
+ // This will resume handling of the tasks.
+ icrLatch.countDown();
+ waitForAllCommandsToFinish();
+
+ assertEquals(NUM_CONTAINERS, subject.getInvocationCount());
+ long totalRunTime = subject.getTotalRunTime();
+ assertTrue(totalRunTime >= expectedTotalMinExecTimeMillis,
+ "Total run time " + totalRunTime + "ms was not larger than the minimum
total exec time " +
+ expectedTotalMinExecTimeMillis + "ms");
+ long avgRunTime = subject.getAverageRunTime();
+ assertTrue(avgRunTime >= minExecTimeMillis,
+ "Average run time " + avgRunTime + "ms was not larger than the minimum
per task exec time " +
+ minExecTimeMillis + "ms");
+ }
+
+ private void waitForAllCommandsToFinish() throws Exception {
+ // Queue count should be decremented only after the task completes, so the
other metrics should be consistent when
+ // it reaches zero.
+ GenericTestUtils.waitFor(() -> {
+ int qCount = subject.getQueuedCount();
+ LOG.info("Waiting for queued command count to reach 0. Currently at " +
qCount);
+ return qCount == 0;
+ }, 500, 3000);
+ }
+
+ private void verifyAllContainerReports(Map<ContainerID,
ContainerReplicaProto> reportsSent) throws Exception {
+ assertEquals(NUM_CONTAINERS, reportsSent.size());
+
+ for (Map.Entry<ContainerID, ContainerReplicaProto> entry:
reportsSent.entrySet()) {
+ ContainerID id = entry.getKey();
+ assertNotNull(containerSet.getContainer(id.getId()));
+
+ long sentDataChecksum = entry.getValue().getDataChecksum();
+ // Current implementation is incomplete, and uses a mocked checksum.
+ assertNotEquals(0, sentDataChecksum, "Report of container " + id +
+ " should have a non-zero checksum");
+ }
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
index 7d8b94e57d..6245489f13 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.container.common.states.endpoint;
import static java.util.Collections.emptyList;
+import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reconcileContainerCommand;
import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reconstructECContainersCommand;
import static
org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager.maxLayoutVersion;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -57,6 +58,7 @@ import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachin
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates;
import
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
import
org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
import
org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB;
@@ -111,6 +113,42 @@ public class TestHeartbeatEndpointTask {
.get(reconstructECContainersCommand).intValue());
}
+ @Test
+ public void testHandlesReconcileContainerCommand() throws Exception {
+ StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+ mock(StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+
+ List<DatanodeDetails> peerDNs = new ArrayList<>();
+ peerDNs.add(MockDatanodeDetails.randomDatanodeDetails());
+ peerDNs.add(MockDatanodeDetails.randomDatanodeDetails());
+ ReconcileContainerCommand cmd = new ReconcileContainerCommand(1, peerDNs);
+
+ when(scm.sendHeartbeat(any()))
+ .thenAnswer(invocation ->
+ SCMHeartbeatResponseProto.newBuilder()
+ .setDatanodeUUID(
+ ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+ .getDatanodeDetails().getUuid())
+ .addCommands(SCMCommandProto.newBuilder()
+ .setCommandType(reconcileContainerCommand)
+ .setReconcileContainerCommandProto(cmd.getProto())
+ .build())
+ .build());
+
+ OzoneConfiguration conf = new OzoneConfiguration();
+ DatanodeStateMachine datanodeStateMachine =
mock(DatanodeStateMachine.class);
+ StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+ datanodeStateMachine, "");
+
+ // WHEN
+ HeartbeatEndpointTask task = getHeartbeatEndpointTask(conf, context, scm);
+ task.call();
+
+ // THEN
+ assertEquals(1, context.getCommandQueueSummary()
+ .get(reconcileContainerCommand).intValue());
+ }
+
@Test
public void testheartbeatWithoutReports() throws Exception {
final long termInSCM = 42;
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index b9c8feae16..2ce6eabe39 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -37,7 +37,9 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.security.token.TokenVerifier;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
@@ -56,7 +58,9 @@ import org.apache.ozone.test.GenericTestUtils;
import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+import static
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
+import static org.apache.hadoop.ozone.OzoneConsts.GB;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -64,6 +68,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.any;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -433,6 +438,38 @@ public class TestKeyValueHandler {
}
}
+ @ContainerLayoutTestInfo.ContainerTest
+ public void testReconcileContainer(ContainerLayoutVersion layoutVersion)
throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+
+ KeyValueContainerData data = new KeyValueContainerData(123L,
layoutVersion, GB,
+ PipelineID.randomId().toString(),
randomDatanodeDetails().getUuidString());
+
+ Container container = new KeyValueContainer(data, conf);
+ ContainerSet containerSet = new ContainerSet(1000);
+ containerSet.addContainer(container);
+
+ // Allows checking the invocation count of the lambda.
+ AtomicInteger icrCount = new AtomicInteger(0);
+ KeyValueHandler keyValueHandler = new KeyValueHandler(conf,
randomDatanodeDetails().getUuidString(), containerSet,
+ mock(MutableVolumeSet.class), mock(ContainerMetrics.class), c -> {
+ // Check that the ICR contains expected info about the container.
+ ContainerReplicaProto report = c.getContainerReport();
+ long reportedID = report.getContainerID();
+ Assertions.assertEquals(container.getContainerData().getContainerID(),
reportedID);
+
+ long reportDataChecksum = report.getDataChecksum();
+ Assertions.assertNotEquals(0, reportDataChecksum,
+ "Container report should have populated the checksum field with a
non-zero value.");
+ icrCount.incrementAndGet();
+ });
+
+ Assertions.assertEquals(0, icrCount.get());
+ // This should trigger container report validation in the ICR handler
above.
+ keyValueHandler.reconcileContainer(container, Collections.emptyList());
+ Assertions.assertEquals(1, icrCount.get());
+ }
+
private static ContainerCommandRequestProto createContainerRequest(
String datanodeId, long containerID) {
return ContainerCommandRequestProto.newBuilder()
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java
index 1b989e6bc7..6680a467b1 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java
@@ -56,7 +56,6 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -152,8 +151,7 @@ class TestContainerImporter {
KeyValueContainerData containerData = spy(new
KeyValueContainerData(containerId,
ContainerLayoutVersion.FILE_PER_BLOCK, 100, "test", "test"));
// mock to return different checksum
- when(containerData.getChecksum()).thenReturn("checksum1", "checksum2");
- doNothing().when(containerData).setChecksumTo0ByteArray();
+ when(containerData.getContainerFileChecksum()).thenReturn("checksum1",
"checksum2");
// create containerImporter object
ContainerController controllerMock = mock(ContainerController.class);
ContainerSet containerSet = new ContainerSet(0);
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 3570257b58..745790abc8 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -103,6 +103,7 @@ import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopContainerBalancerRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ResetDeletedBlockRetryCountRequestProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReconcileContainerRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.Type;
import org.apache.hadoop.hdds.scm.DatanodeAdminError;
import org.apache.hadoop.hdds.scm.ScmInfo;
@@ -1198,4 +1199,13 @@ public final class
StorageContainerLocationProtocolClientSideTranslatorPB
String metricsJsonStr = response.getMetricsJson();
return metricsJsonStr;
}
+
+ @Override
+ public void reconcileContainer(long containerID) throws IOException {
+ ReconcileContainerRequestProto request =
ReconcileContainerRequestProto.newBuilder()
+ .setContainerID(containerID)
+ .build();
+ // TODO check error handling.
+ submitRequest(Type.ReconcileContainer, builder ->
builder.setReconcileContainerRequest(request));
+ }
}
diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
index c190dc3f45..ea63b82c8c 100644
--- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
+++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
@@ -84,6 +84,7 @@ message ScmContainerLocationRequest {
optional SingleNodeQueryRequestProto singleNodeQueryRequest = 45;
optional GetContainersOnDecomNodeRequestProto
getContainersOnDecomNodeRequest = 46;
optional GetMetricsRequestProto getMetricsRequest = 47;
+ optional ReconcileContainerRequestProto reconcileContainerRequest = 48;
}
message ScmContainerLocationResponse {
@@ -139,6 +140,7 @@ message ScmContainerLocationResponse {
optional SingleNodeQueryResponseProto singleNodeQueryResponse = 45;
optional GetContainersOnDecomNodeResponseProto
getContainersOnDecomNodeResponse = 46;
optional GetMetricsResponseProto getMetricsResponse = 47;
+ optional ReconcileContainerResponseProto reconcileContainerResponse = 48;
enum Status {
OK = 1;
@@ -193,6 +195,7 @@ enum Type {
SingleNodeQuery = 41;
GetContainersOnDecomNode = 42;
GetMetrics = 43;
+ ReconcileContainer = 44;
}
/**
@@ -637,6 +640,13 @@ message GetMetricsResponseProto {
optional string metricsJson = 1;
}
+message ReconcileContainerRequestProto {
+ required int64 containerID = 1;
+}
+
+message ReconcileContainerResponseProto {
+}
+
/**
* Protocol used from an HDFS node to StorageContainerManager. See the request
* and response messages for details of the RPC calls.
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index 4058453123..93cdbf0462 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -432,6 +432,7 @@ message SCMContainerReplicaProto {
required int64 keyCount = 6;
required int64 bytesUsed = 7;
optional int64 replicaIndex = 8;
+ optional int64 dataChecksum = 9;
}
message KeyContainerIDList {
diff --git
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index 2994073c02..d5600880e6 100644
---
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -230,12 +230,13 @@ message ContainerReplicaProto {
optional int64 writeCount = 7;
optional int64 readBytes = 8;
optional int64 writeBytes = 9;
- optional string finalhash = 10;
+ optional string finalhash = 10 [ deprecated = true ];
optional int64 deleteTransactionId = 11;
optional uint64 blockCommitSequenceId = 12;
optional string originNodeId = 13;
optional int32 replicaIndex = 14;
optional bool isEmpty = 15 [default = false];
+ optional int64 dataChecksum = 16;
}
message CommandStatusReportsProto {
@@ -328,6 +329,7 @@ message SCMCommandProto {
finalizeNewLayoutVersionCommand = 9;
refreshVolumeUsageInfo = 10;
reconstructECContainersCommand = 11;
+ reconcileContainerCommand = 12;
}
// TODO: once we start using protoc 3.x, refactor this message using "oneof"
required Type commandType = 1;
@@ -343,6 +345,7 @@ message SCMCommandProto {
finalizeNewLayoutVersionCommandProto = 10;
optional RefreshVolumeUsageCommandProto refreshVolumeUsageCommandProto = 11;
optional ReconstructECContainersCommandProto
reconstructECContainersCommandProto = 12;
+ optional ReconcileContainerCommandProto reconcileContainerCommandProto = 13;
// If running upon Ratis, holds term of underlying RaftServer iff current
@@ -499,6 +502,15 @@ message FinalizeNewLayoutVersionCommandProto {
required int64 cmdId = 3;
}
+/**
+This command asks the datanode to reconcile its copy of a container with its
peer datanodes that also have a copy of
+the container.
+*/
+message ReconcileContainerCommandProto {
+ required int64 containerID = 1;
+ repeated DatanodeDetailsProto peers = 2;
+}
+
/**
* Protocol used from a datanode to StorageContainerManager.
*
diff --git
a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
index 3d281975f2..9739f06b4b 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
@@ -139,6 +139,7 @@ enum Status {
CA_ROTATION_IN_POST_PROGRESS = 44;
CONTAINER_ALREADY_CLOSED = 45;
CONTAINER_ALREADY_CLOSING = 46;
+ UNSUPPORTED_OPERATION = 47;
}
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
index 7e163ac306..db00d6842d 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
@@ -381,6 +381,7 @@ public class AbstractContainerReportHandler {
.setReplicaIndex(replicaProto.getReplicaIndex())
.setBytesUsed(replicaProto.getUsed())
.setEmpty(replicaProto.getIsEmpty())
+ .setDataChecksum(replicaProto.getDataChecksum())
.build();
if (replica.getState().equals(State.DELETED)) {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
index 78ebfd311d..d008e24f1c 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
@@ -43,6 +43,7 @@ public final class ContainerReplica implements
Comparable<ContainerReplica> {
private final long keyCount;
private final long bytesUsed;
private final boolean isEmpty;
+ private final long dataChecksum;
private ContainerReplica(ContainerReplicaBuilder b) {
containerID = b.containerID;
@@ -54,6 +55,7 @@ public final class ContainerReplica implements
Comparable<ContainerReplica> {
replicaIndex = b.replicaIndex;
isEmpty = b.isEmpty;
sequenceId = b.sequenceId;
+ dataChecksum = b.dataChecksum;
}
/**
@@ -114,6 +116,10 @@ public final class ContainerReplica implements
Comparable<ContainerReplica> {
return isEmpty;
}
+ public long getDataChecksum() {
+ return dataChecksum;
+ }
+
@Override
public int hashCode() {
return new HashCodeBuilder(61, 71)
@@ -201,6 +207,7 @@ public final class ContainerReplica implements
Comparable<ContainerReplica> {
private long keyCount;
private int replicaIndex;
private boolean isEmpty;
+ private long dataChecksum;
/**
* Set Container Id.
@@ -275,6 +282,11 @@ public final class ContainerReplica implements
Comparable<ContainerReplica> {
return this;
}
+ public ContainerReplicaBuilder setDataChecksum(long dataChecksum) {
+ this.dataChecksum = dataChecksum;
+ return this;
+ }
+
/**
* Constructs new ContainerReplicaBuilder.
*
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconcileContainerEventHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconcileContainerEventHandler.java
new file mode 100644
index 0000000000..f13b37f3ee
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconcileContainerEventHandler.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.reconciliation;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import
org.apache.hadoop.hdds.scm.container.reconciliation.ReconciliationEligibilityHandler.EligibilityResult;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND;
+
+/**
+ * When a reconcile container event is fired, this class will check if the
container is eligible for reconciliation,
+ * and if so, send the reconcile request to all datanodes with a replica of
that container.
+ */
+public class ReconcileContainerEventHandler implements
EventHandler<ContainerID> {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(ReconcileContainerEventHandler.class);
+
+ private final ContainerManager containerManager;
+ private final SCMContext scmContext;
+
+ public ReconcileContainerEventHandler(ContainerManager containerManager,
SCMContext scmContext) {
+ this.containerManager = containerManager;
+ this.scmContext = scmContext;
+ }
+
+ @Override
+ public void onMessage(ContainerID containerID, EventPublisher publisher) {
+ if (!scmContext.isLeader()) {
+ LOG.info("Skip reconciling container {} since current SCM is not
leader.", containerID);
+ return;
+ }
+
+ EligibilityResult result =
ReconciliationEligibilityHandler.isEligibleForReconciliation(containerID,
+ containerManager);
+ if (!result.isOk()) {
+ LOG.error("{}", result);
+ return;
+ }
+
+ try {
+ // TODO HDDS-10714 restriction peer and target nodes based on node
status.
+ Set<DatanodeDetails> allReplicaNodes =
containerManager.getContainerReplicas(containerID)
+ .stream()
+ .map(ContainerReplica::getDatanodeDetails)
+ .collect(Collectors.toSet());
+
+ LOG.info("Reconcile container event triggered for container {} with
peers {}", containerID, allReplicaNodes);
+
+ for (DatanodeDetails replica : allReplicaNodes) {
+ List<DatanodeDetails> otherReplicas = allReplicaNodes.stream()
+ .filter(other -> !other.equals(replica))
+ .collect(Collectors.toList());
+ ReconcileContainerCommand command = new
ReconcileContainerCommand(containerID.getId(), otherReplicas);
+ command.setTerm(scmContext.getTermOfLeader());
+ publisher.fireEvent(DATANODE_COMMAND, new
CommandForDatanode<>(replica.getUuid(), command));
+ }
+ } catch (ContainerNotFoundException ex) {
+ LOG.error("Failed to start reconciliation for container {}. Container
not found.", containerID);
+ } catch (NotLeaderException nle) {
+ LOG.info("Skip reconciling container {} since current SCM is not
leader.", containerID);
+ }
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconciliationEligibilityHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconciliationEligibilityHandler.java
new file mode 100644
index 0000000000..cdc08556d2
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconciliationEligibilityHandler.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.reconciliation;
+
+
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Determines whether a container is eligible for reconciliation based on its
state, replica states, replication
+ * type, and replication factor.
+ */
+public final class ReconciliationEligibilityHandler {
+ public static final Set<HddsProtos.LifeCycleState> ELIGIBLE_CONTAINER_STATES
=
+ EnumSet.of(HddsProtos.LifeCycleState.CLOSED,
HddsProtos.LifeCycleState.QUASI_CLOSED);
+ public static final Set<State> ELIGIBLE_REPLICA_STATES =
+ EnumSet.of(State.CLOSED, State.QUASI_CLOSED, State.UNHEALTHY);
+
+ /**
+ * Utility class only.
+ */
+ private ReconciliationEligibilityHandler() { }
+
+ public static EligibilityResult isEligibleForReconciliation(
+ ContainerID containerID, ContainerManager containerManager) {
+ ContainerInfo container;
+ Set<ContainerReplica> replicas;
+ try {
+ container = containerManager.getContainer(containerID);
+ replicas = containerManager.getContainerReplicas(containerID);
+ } catch (ContainerNotFoundException ex) {
+ return new EligibilityResult(Result.CONTAINER_NOT_FOUND,
+ String.format("Container %s not found for reconciliation.",
containerID));
+ }
+
+ if (!ELIGIBLE_CONTAINER_STATES.contains(container.getState())) {
+ return new EligibilityResult(Result.INELIGIBLE_CONTAINER_STATE,
+ String.format("Cannot reconcile container %d in state %s.",
container.getContainerID(),
+ container.getState()));
+ }
+
+ if (replicas.isEmpty()) {
+ return new EligibilityResult(Result.NO_REPLICAS_FOUND,
+ String.format("Cannot reconcile container %d because no replicas
could be found.",
+ container.getContainerID()));
+ }
+
+ boolean replicasValid = replicas.stream()
+ .map(ContainerReplica::getState)
+ .allMatch(ELIGIBLE_REPLICA_STATES::contains);
+ if (!replicasValid) {
+ return new EligibilityResult(Result.INELIGIBLE_REPLICA_STATES,
+ String.format("Cannot reconcile container %s in state %s with
replica states: %s", containerID,
+ container.getState(), replicas.stream()
+ .map(r -> r.getState().toString())
+ .collect(Collectors.joining(", "))));
+ }
+
+ // Reconcile on EC containers is not yet implemented.
+ ReplicationConfig repConfig = container.getReplicationConfig();
+ if (repConfig.getReplicationType() != HddsProtos.ReplicationType.RATIS) {
+ return new EligibilityResult(Result.INELIGIBLE_REPLICATION_TYPE,
+ String.format("Cannot reconcile container %s with replication type
%s. Reconciliation is currently only " +
+ "supported for Ratis containers.", containerID,
repConfig.getReplicationType()));
+ }
+
+ // Reconciliation requires multiple replicas to reconcile.
+ int requiredNodes = repConfig.getRequiredNodes();
+ if (requiredNodes <= 1) {
+ return new EligibilityResult(Result.NOT_ENOUGH_REQUIRED_NODES,
+ String.format("Cannot reconcile container %s with %d required nodes.
Reconciliation is only supported for " +
+ "containers with more than 1 required node.", containerID,
repConfig.getRequiredNodes()));
+ }
+
+ return new EligibilityResult(Result.OK, "Container %s is eligible for
reconciliation." + containerID);
+ }
+
+ /**
+ * Defines the reasons a container may not be eligible for reconciliation.
+ */
+ public enum Result {
+ OK,
+ CONTAINER_NOT_FOUND,
+ INELIGIBLE_CONTAINER_STATE,
+ INELIGIBLE_REPLICA_STATES,
+ INELIGIBLE_REPLICATION_TYPE,
+ NOT_ENOUGH_REQUIRED_NODES,
+ NO_REPLICAS_FOUND
+ }
+
+ /**
+ * Provides a status and message indicating whether a container is eligible
for reconciliation.
+ */
+ public static final class EligibilityResult {
+ private final Result result;
+ private final String message;
+
+ private EligibilityResult(Result result, String message) {
+ this.result = result;
+ this.message = message;
+ }
+
+ public Result getResult() {
+ return result;
+ }
+
+ public boolean isOk() {
+ return result == Result.OK;
+ }
+
+ @Override
+ public String toString() {
+ return message;
+ }
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/package-info.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/package-info.java
new file mode 100644
index 0000000000..fa1e355fd1
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.reconciliation;
+/**
+ * This package contains classes related to container reconciliation.
+ */
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 0cc205b2ff..3b2a84f4f3 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -217,6 +217,13 @@ public final class SCMEvents {
new TypedEvent<>(CRLStatusReportFromDatanode.class,
"Crl_Status_Report");
+ /**
+ * This event will be triggered whenever a datanode needs to reconcile its
replica of a container with other
+ * replicas in the cluster.
+ */
+ public static final TypedEvent<ContainerID>
+ RECONCILE_CONTAINER = new TypedEvent<>(ContainerID.class,
"Reconcile_Container");
+
/**
* Private Ctor. Never Constructed.
*/
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 3d7cff358f..b88f2c2a7d 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -30,6 +30,8 @@ import
org.apache.hadoop.hdds.protocol.proto.HddsProtos.TransferLeadershipReques
import
org.apache.hadoop.hdds.protocol.proto.HddsProtos.TransferLeadershipResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.HddsProtos.UpgradeFinalizationStatus;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReconcileContainerRequestProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReconcileContainerResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ClosePipelineRequestProto;
@@ -722,6 +724,12 @@ public final class
StorageContainerLocationProtocolServerSideTranslatorPB
.setStatus(Status.OK)
.setGetMetricsResponse(getMetrics(request.getGetMetricsRequest()))
.build();
+ case ReconcileContainer:
+ return ScmContainerLocationResponse.newBuilder()
+ .setCmdType(request.getCmdType())
+ .setStatus(Status.OK)
+
.setReconcileContainerResponse(reconcileContainer(request.getReconcileContainerRequest()))
+ .build();
default:
throw new IllegalArgumentException(
"Unknown command type: " + request.getCmdType());
@@ -1333,4 +1341,9 @@ public final class
StorageContainerLocationProtocolServerSideTranslatorPB
public GetMetricsResponseProto getMetrics(GetMetricsRequestProto request)
throws IOException {
return
GetMetricsResponseProto.newBuilder().setMetricsJson(impl.getMetrics(request.getQuery())).build();
}
+
+ public ReconcileContainerResponseProto
reconcileContainer(ReconcileContainerRequestProto request) throws IOException {
+ impl.reconcileContainer(request.getContainerID());
+ return ReconcileContainerResponseProto.getDefaultInstance();
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 47bc66d833..9cf44677eb 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -55,6 +55,8 @@ import
org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancerConfigurat
import
org.apache.hadoop.hdds.scm.container.balancer.IllegalContainerBalancerStateException;
import
org.apache.hadoop.hdds.scm.container.balancer.InvalidContainerBalancerConfigurationException;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import
org.apache.hadoop.hdds.scm.container.reconciliation.ReconciliationEligibilityHandler;
+import
org.apache.hadoop.hdds.scm.container.reconciliation.ReconciliationEligibilityHandler.EligibilityResult;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
@@ -336,7 +338,9 @@ public class SCMClientProtocolServer implements
.setPlaceOfBirth(r.getOriginDatanodeId().toString())
.setKeyCount(r.getKeyCount())
.setSequenceID(r.getSequenceId())
- .setReplicaIndex(r.getReplicaIndex()).build()
+ .setReplicaIndex(r.getReplicaIndex())
+ .setDataChecksum(r.getDataChecksum())
+ .build()
);
}
return results;
@@ -1438,4 +1442,42 @@ public class SCMClientProtocolServer implements
FetchMetrics fetchMetrics = new FetchMetrics();
return fetchMetrics.getMetrics(query);
}
+
+ @Override
+ public void reconcileContainer(long longContainerID) throws IOException {
+ ContainerID containerID = ContainerID.valueOf(longContainerID);
+ getScm().checkAdminAccess(getRemoteUser(), false);
+ final UserGroupInformation remoteUser = getRemoteUser();
+ final Map<String, String> auditMap = Maps.newHashMap();
+ auditMap.put("containerID", containerID.toString());
+ auditMap.put("remoteUser", remoteUser.getUserName());
+
+ try {
+ EligibilityResult result =
ReconciliationEligibilityHandler.isEligibleForReconciliation(containerID,
+ getScm().getContainerManager());
+ if (!result.isOk()) {
+ switch (result.getResult()) {
+ case OK:
+ break;
+ case CONTAINER_NOT_FOUND:
+ throw new ContainerNotFoundException(result.toString());
+ case INELIGIBLE_CONTAINER_STATE:
+ throw new SCMException(result.toString(),
ResultCodes.UNEXPECTED_CONTAINER_STATE);
+ case INELIGIBLE_REPLICA_STATES:
+ case INELIGIBLE_REPLICATION_TYPE:
+ case NOT_ENOUGH_REQUIRED_NODES:
+ case NO_REPLICAS_FOUND:
+ throw new SCMException(result.toString(),
ResultCodes.UNSUPPORTED_OPERATION);
+ default:
+ throw new SCMException("Unknown reconciliation eligibility result "
+ result, ResultCodes.INTERNAL_ERROR);
+ }
+ }
+
+ scm.getEventQueue().fireEvent(SCMEvents.RECONCILE_CONTAINER,
containerID);
+
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(SCMAction.RECONCILE_CONTAINER,
auditMap));
+ } catch (SCMException ex) {
+
AUDIT.logWriteFailure(buildAuditMessageForFailure(SCMAction.RECONCILE_CONTAINER,
auditMap, ex));
+ throw ex;
+ }
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index 3d864d4ea2..98a7aa22f3 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -73,6 +73,7 @@ import
org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import
org.apache.hadoop.ozone.protocol.commands.FinalizeNewLayoutVersionCommand;
+import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.RefreshVolumeUsageCommand;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
@@ -94,6 +95,7 @@ import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProt
import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteBlocksCommand;
import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteContainerCommand;
import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.finalizeNewLayoutVersionCommand;
+import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reconcileContainerCommand;
import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reconstructECContainersCommand;
import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.refreshVolumeUsageInfo;
import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.replicateContainerCommand;
@@ -407,6 +409,12 @@ public class SCMDatanodeProtocolServer implements
.setRefreshVolumeUsageCommandProto(
((RefreshVolumeUsageCommand)cmd).getProto())
.build();
+ case reconcileContainerCommand:
+ return builder
+ .setCommandType(reconcileContainerCommand)
+ .setReconcileContainerCommandProto(
+ ((ReconcileContainerCommand)cmd).getProto())
+ .build();
default:
throw new IllegalArgumentException("Scm command " +
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index e86dab5fd7..33fbf5fb76 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdds.scm.ScmUtils;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl;
import org.apache.hadoop.hdds.scm.PlacementPolicyValidateProxy;
+import
org.apache.hadoop.hdds.scm.container.reconciliation.ReconcileContainerEventHandler;
import org.apache.hadoop.hdds.scm.container.balancer.MoveManager;
import
org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
import
org.apache.hadoop.hdds.scm.container.replication.DatanodeCommandCountUpdatedHandler;
@@ -506,6 +507,9 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
CRLStatusReportHandler crlStatusReportHandler =
new CRLStatusReportHandler(certificateStore, configuration);
+ ReconcileContainerEventHandler reconcileContainerEventHandler =
+ new ReconcileContainerEventHandler(containerManager, scmContext);
+
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND,
scmNodeManager);
eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
@@ -578,6 +582,7 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler);
eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler);
eventQueue.addHandler(SCMEvents.CRL_STATUS_REPORT, crlStatusReportHandler);
+ eventQueue.addHandler(SCMEvents.RECONCILE_CONTAINER,
reconcileContainerEventHandler);
scmNodeManager.registerSendCommandNotify(
SCMCommandProto.Type.deleteBlocksCommand,
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
index 4e1fe234ff..2c9df2afb4 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
@@ -52,7 +52,8 @@ public enum SCMAction implements AuditAction {
GET_REPLICATION_MANAGER_REPORT,
RESET_DELETED_BLOCK_RETRY_COUNT,
TRANSFER_LEADERSHIP,
- GET_FAILED_DELETED_BLOCKS_TRANSACTION;
+ GET_FAILED_DELETED_BLOCKS_TRANSACTION,
+ RECONCILE_CONTAINER;
@Override
public String getAction() {
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
index 695c88d11a..8b77e30013 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
@@ -60,12 +60,14 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.doAnswer;
@@ -100,7 +102,6 @@ public class TestContainerReportHandler {
dbStore = DBStoreBuilder.createDBStore(
conf, new SCMDBDefinition());
scmhaManager = SCMHAManagerStub.getInstance(true);
- nodeManager = new MockNodeManager(true, 10);
pipelineManager =
new MockPipelineManager(dbStore, scmhaManager, nodeManager);
containerStateManager = ContainerStateManagerImpl.newBuilder()
@@ -979,6 +980,153 @@ public class TestContainerReportHandler {
containerOne.containerID()).size());
}
+ @Test
+ public void testWithNoContainerDataChecksum() throws Exception {
+ final ContainerReportHandler reportHandler = new
ContainerReportHandler(nodeManager, containerManager);
+
+ final int numNodes = 3;
+ List<DatanodeDetails> datanodes =
nodeManager.getNodes(NodeStatus.inServiceHealthy()).stream()
+ .limit(numNodes)
+ .collect(Collectors.toList());
+
+ // Create a container and put one replica on each datanode.
+ final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
+ ContainerID contID = container.containerID();
+ final Set<ContainerID> containerIDSet =
Stream.of(contID).collect(Collectors.toSet());
+
+ for (DatanodeDetails dn: datanodes) {
+ nodeManager.setContainers(dn, containerIDSet);
+ }
+
+ containerStateManager.addContainer(container.getProtobuf());
+
+ getReplicas(contID, ContainerReplicaProto.State.CLOSED, 0, datanodes)
+ .forEach(r -> containerStateManager.updateContainerReplica(contID, r));
+
+ // Container manager should now be aware of 3 replicas of each container.
+ assertEquals(numNodes,
containerManager.getContainerReplicas(contID).size());
+
+ // All replicas should start with an empty data checksum in SCM.
+ boolean contOneDataChecksumsEmpty =
containerManager.getContainerReplicas(contID).stream()
+ .allMatch(r -> r.getDataChecksum() == 0);
+ assertTrue(contOneDataChecksumsEmpty, "Replicas of container one should
not yet have any data checksums.");
+
+ // Send a report to SCM from one datanode that still does not have a data
checksum.
+ int numReportsSent = 0;
+ for (DatanodeDetails dn: datanodes) {
+ final ContainerReportsProto dnReportProto = getContainerReportsProto(
+ contID, ContainerReplicaProto.State.CLOSED, dn.getUuidString());
+ final ContainerReportFromDatanode dnReport = new
ContainerReportFromDatanode(dn, dnReportProto);
+ reportHandler.onMessage(dnReport, publisher);
+ numReportsSent++;
+ }
+ assertEquals(numNodes, numReportsSent);
+
+ // Regardless of which datanode sent the report, none of them have
checksums, so all replica's data checksums
+ // should remain empty.
+ boolean containerDataChecksumEmpty =
containerManager.getContainerReplicas(contID).stream()
+ .allMatch(r -> r.getDataChecksum() == 0);
+ assertTrue(containerDataChecksumEmpty, "Replicas of the container should
not have any data checksums.");
+ }
+
+ @Test
+ public void testWithContainerDataChecksum() throws Exception {
+ final ContainerReportHandler reportHandler = new
ContainerReportHandler(nodeManager, containerManager);
+
+ final int numNodes = 3;
+ List<DatanodeDetails> datanodes =
nodeManager.getNodes(NodeStatus.inServiceHealthy()).stream()
+ .limit(numNodes)
+ .collect(Collectors.toList());
+
+ // Create a container and put one replica on each datanode.
+ final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
+ ContainerID contID = container.containerID();
+ final Set<ContainerID> containerIDSet =
Stream.of(contID).collect(Collectors.toSet());
+
+ for (DatanodeDetails dn: datanodes) {
+ nodeManager.setContainers(dn, containerIDSet);
+ }
+
+ containerStateManager.addContainer(container.getProtobuf());
+
+ getReplicas(contID, ContainerReplicaProto.State.CLOSED, 0, datanodes)
+ .forEach(r -> containerStateManager.updateContainerReplica(contID, r));
+
+ // Container manager should now be aware of 3 replicas of each container.
+ assertEquals(numNodes,
containerManager.getContainerReplicas(contID).size());
+
+ // All replicas should start with an empty data checksum in SCM.
+ boolean dataChecksumsEmpty =
containerManager.getContainerReplicas(contID).stream()
+ .allMatch(r -> r.getDataChecksum() == 0);
+ assertTrue(dataChecksumsEmpty, "Replicas of container one should not yet
have any data checksums.");
+
+ // For each datanode, send a container report with a mismatched checksum.
+ for (DatanodeDetails dn: datanodes) {
+ ContainerReportsProto dnReportProto = getContainerReportsProto(
+ contID, ContainerReplicaProto.State.CLOSED, dn.getUuidString());
+ ContainerReplicaProto replicaWithChecksum =
dnReportProto.getReports(0).toBuilder()
+ .setDataChecksum(createUniqueDataChecksumForReplica(contID,
dn.getUuidString()))
+ .build();
+ ContainerReportsProto reportWithChecksum = dnReportProto.toBuilder()
+ .clearReports()
+ .addReports(replicaWithChecksum)
+ .build();
+ final ContainerReportFromDatanode dnReport = new
ContainerReportFromDatanode(dn, reportWithChecksum);
+ reportHandler.onMessage(dnReport, publisher);
+ }
+
+ // All the replicas should have different checksums.
+ // Since the containers don't have any data in this test, different
checksums are based on container ID and
+ // datanode ID.
+ int numReplicasChecked = 0;
+ for (ContainerReplica replica:
containerManager.getContainerReplicas(contID)) {
+ long expectedChecksum = createUniqueDataChecksumForReplica(contID,
replica.getDatanodeDetails().getUuidString());
+ assertEquals(expectedChecksum, replica.getDataChecksum());
+ numReplicasChecked++;
+ }
+ assertEquals(numNodes, numReplicasChecked);
+
+ // For each datanode, send a container report with a matching checksum.
+ // This simulates reconciliation running.
+ for (DatanodeDetails dn: datanodes) {
+ ContainerReportsProto dnReportProto = getContainerReportsProto(
+ contID, ContainerReplicaProto.State.CLOSED, dn.getUuidString());
+ ContainerReplicaProto replicaWithChecksum =
dnReportProto.getReports(0).toBuilder()
+ .setDataChecksum(createMatchingDataChecksumForReplica(contID))
+ .build();
+ ContainerReportsProto reportWithChecksum = dnReportProto.toBuilder()
+ .clearReports()
+ .addReports(replicaWithChecksum)
+ .build();
+ final ContainerReportFromDatanode dnReport = new
ContainerReportFromDatanode(dn, reportWithChecksum);
+ reportHandler.onMessage(dnReport, publisher);
+ }
+
+ // All the replicas should now have matching checksums.
+ // Since the containers don't have any data in this test, the matching
checksums are based on container ID only.
+ numReplicasChecked = 0;
+ for (ContainerReplica replica:
containerManager.getContainerReplicas(contID)) {
+ long expectedChecksum = createMatchingDataChecksumForReplica(contID);
+ assertEquals(expectedChecksum, replica.getDataChecksum());
+ numReplicasChecked++;
+ }
+ assertEquals(numNodes, numReplicasChecked);
+ }
+
+ /**
+ * Generates a placeholder data checksum for testing that is specific to a
container replica.
+ */
+ protected static long createUniqueDataChecksumForReplica(ContainerID
containerID, String datanodeID) {
+ return (datanodeID + containerID).hashCode();
+ }
+
+ /**
+ * Generates a placeholder data checksum for testing that is the same for
all container replicas.
+ */
+ protected static long createMatchingDataChecksumForReplica(ContainerID
containerID) {
+ return Objects.hashCode(containerID);
+ }
+
private ContainerReportFromDatanode getContainerReportFromDatanode(
ContainerID containerId, ContainerReplicaProto.State state,
DatanodeDetails dn, long bytesUsed, long keyCount) {
@@ -1021,7 +1169,6 @@ public class TestContainerReportHandler {
.setContainerID(containerId.getId())
.setState(state)
.setOriginNodeId(originNodeId)
- .setFinalhash("e16cc9d6024365750ed8dbd194ea46d2")
.setSize(5368709120L)
.setUsed(usedBytes)
.setKeyCount(keyCount)
@@ -1035,5 +1182,4 @@ public class TestContainerReportHandler {
.build();
return crBuilder.addReports(replicaProto).build();
}
-
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
index dbcccce598..9abbda8193 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
@@ -76,6 +76,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import java.util.stream.Stream;
import static
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
@@ -83,6 +84,9 @@ import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProt
import static org.apache.hadoop.hdds.scm.HddsTestUtils.getContainer;
import static org.apache.hadoop.hdds.scm.HddsTestUtils.getECContainer;
import static org.apache.hadoop.hdds.scm.HddsTestUtils.getReplicas;
+import static
org.apache.hadoop.hdds.scm.container.TestContainerReportHandler.createMatchingDataChecksumForReplica;
+import static
org.apache.hadoop.hdds.scm.container.TestContainerReportHandler.createUniqueDataChecksumForReplica;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.doAnswer;
@@ -576,6 +580,144 @@ public class TestIncrementalContainerReportHandler {
}
}
+ @Test
+ public void testWithNoContainerDataChecksum() throws Exception {
+ final IncrementalContainerReportHandler reportHandler = new
IncrementalContainerReportHandler(nodeManager,
+ containerManager, scmContext);
+
+ final int numNodes = 3;
+
+ // Create a container which will have one replica on each datanode.
+ final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
+ ContainerID contID = container.containerID();
+ final Set<ContainerID> containerIDSet =
Stream.of(contID).collect(Collectors.toSet());
+
+ List<DatanodeDetails> datanodes = new ArrayList<>();
+ for (int i = 0; i < numNodes; i++) {
+ DatanodeDetails dn = randomDatanodeDetails();
+ nodeManager.register(dn, null, null);
+ nodeManager.setContainers(dn, containerIDSet);
+ datanodes.add(dn);
+ }
+
+ containerStateManager.addContainer(container.getProtobuf());
+
+ getReplicas(contID, ContainerReplicaProto.State.CLOSED, 0, datanodes)
+ .forEach(r -> containerStateManager.updateContainerReplica(contID, r));
+
+ // Container manager should now be aware of 3 replicas of each container.
+ assertEquals(numNodes,
containerManager.getContainerReplicas(contID).size());
+
+ // All replicas should start with an empty data checksum in SCM.
+ boolean contOneDataChecksumsEmpty =
containerManager.getContainerReplicas(contID).stream()
+ .allMatch(r -> r.getDataChecksum() == 0);
+ assertTrue(contOneDataChecksumsEmpty, "Replicas of container one should
not yet have any data checksums.");
+
+ // Send a report to SCM from one datanode that still does not have a data
checksum.
+ for (DatanodeDetails dn: datanodes) {
+ final IncrementalContainerReportProto dnReportProto =
getIncrementalContainerReportProto(
+ contID, ContainerReplicaProto.State.CLOSED, dn.getUuidString());
+ final IncrementalContainerReportFromDatanode dnReport = new
IncrementalContainerReportFromDatanode(dn,
+ dnReportProto);
+ reportHandler.onMessage(dnReport, publisher);
+ }
+
+ // Regardless of which datanode sent the report, none of them have
checksums, so all replica's data checksums
+ // should remain empty.
+ boolean containerDataChecksumEmpty =
containerManager.getContainerReplicas(contID).stream()
+ .allMatch(r -> r.getDataChecksum() == 0);
+ assertTrue(containerDataChecksumEmpty, "Replicas of the container should
not have any data checksums.");
+ }
+
+ @Test
+ public void testWithContainerDataChecksum() throws Exception {
+ final IncrementalContainerReportHandler reportHandler = new
IncrementalContainerReportHandler(nodeManager,
+ containerManager, scmContext);
+
+ final int numNodes = 3;
+
+ // Create a container which will have one replica on each datanode.
+ final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
+ ContainerID contID = container.containerID();
+ final Set<ContainerID> containerIDSet =
Stream.of(contID).collect(Collectors.toSet());
+
+ List<DatanodeDetails> datanodes = new ArrayList<>();
+ for (int i = 0; i < numNodes; i++) {
+ DatanodeDetails dn = randomDatanodeDetails();
+ nodeManager.register(dn, null, null);
+ nodeManager.setContainers(dn, containerIDSet);
+ datanodes.add(dn);
+ }
+
+ containerStateManager.addContainer(container.getProtobuf());
+
+ getReplicas(contID, ContainerReplicaProto.State.CLOSED, 0, datanodes)
+ .forEach(r -> containerStateManager.updateContainerReplica(contID, r));
+
+ // Container manager should now be aware of 3 replicas of each container.
+ assertEquals(3, containerManager.getContainerReplicas(contID).size());
+
+ // All replicas should start with a zero data checksum in SCM.
+ boolean dataChecksumsEmpty =
containerManager.getContainerReplicas(contID).stream()
+ .allMatch(r -> r.getDataChecksum() == 0);
+ assertTrue(dataChecksumsEmpty, "Replicas of container one should not yet
have any data checksums.");
+
+ // For each datanode, send a container report with a mismatched checksum.
+ for (DatanodeDetails dn: datanodes) {
+ IncrementalContainerReportProto dnReportProto =
getIncrementalContainerReportProto(
+ contID, ContainerReplicaProto.State.CLOSED, dn.getUuidString());
+ ContainerReplicaProto replicaWithChecksum =
dnReportProto.getReport(0).toBuilder()
+ .setDataChecksum(createUniqueDataChecksumForReplica(contID,
dn.getUuidString()))
+ .build();
+ IncrementalContainerReportProto reportWithChecksum =
dnReportProto.toBuilder()
+ .clearReport()
+ .addReport(replicaWithChecksum)
+ .build();
+ final IncrementalContainerReportFromDatanode dnReport = new
IncrementalContainerReportFromDatanode(dn,
+ reportWithChecksum);
+ reportHandler.onMessage(dnReport, publisher);
+ }
+
+ // All the replicas should have different checksums.
+ // Since the containers don't have any data in this test, different
checksums are based on container ID and
+ // datanode ID.
+ int numReplicasChecked = 0;
+ for (ContainerReplica replica:
containerManager.getContainerReplicas(contID)) {
+ long expectedChecksum = createUniqueDataChecksumForReplica(
+ contID, replica.getDatanodeDetails().getUuidString());
+ assertEquals(expectedChecksum, replica.getDataChecksum());
+ numReplicasChecked++;
+ }
+ assertEquals(numNodes, numReplicasChecked);
+
+ // For each datanode, send a container report with a matching checksum.
+ // This simulates reconciliation running.
+ for (DatanodeDetails dn: datanodes) {
+ IncrementalContainerReportProto dnReportProto =
getIncrementalContainerReportProto(
+ contID, ContainerReplicaProto.State.CLOSED, dn.getUuidString());
+ ContainerReplicaProto replicaWithChecksum =
dnReportProto.getReport(0).toBuilder()
+ .setDataChecksum(createMatchingDataChecksumForReplica(contID))
+ .build();
+ IncrementalContainerReportProto reportWithChecksum =
dnReportProto.toBuilder()
+ .clearReport()
+ .addReport(replicaWithChecksum)
+ .build();
+ IncrementalContainerReportFromDatanode dnReport = new
IncrementalContainerReportFromDatanode(dn,
+ reportWithChecksum);
+ reportHandler.onMessage(dnReport, publisher);
+ }
+
+ // All the replicas should now have matching checksums.
+ // Since the containers don't have any data in this test, the matching
checksums are based on container ID only.
+ numReplicasChecked = 0;
+ for (ContainerReplica replica:
containerManager.getContainerReplicas(contID)) {
+ long expectedChecksum = createMatchingDataChecksumForReplica(contID);
+ assertEquals(expectedChecksum, replica.getDataChecksum());
+ numReplicasChecked++;
+ }
+ assertEquals(numNodes, numReplicasChecked);
+ }
+
private static IncrementalContainerReportProto
getIncrementalContainerReportProto(ContainerReplicaProto replicaProto) {
final IncrementalContainerReportProto.Builder crBuilder =
@@ -595,7 +737,6 @@ public class TestIncrementalContainerReportHandler {
.setContainerID(containerId.getId())
.setState(state)
.setOriginNodeId(originNodeId)
- .setFinalhash("e16cc9d6024365750ed8dbd194ea46d2")
.setSize(5368709120L)
.setUsed(2000000000L)
.setKeyCount(100000000L)
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/reconciliation/TestReconcileContainerEventHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/reconciliation/TestReconcileContainerEventHandler.java
new file mode 100644
index 0000000000..ffc96217b4
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/reconciliation/TestReconcileContainerEventHandler.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.reconciliation;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReconcileContainerCommandProto;
+import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import
org.apache.hadoop.hdds.scm.container.reconciliation.ReconciliationEligibilityHandler.EligibilityResult;
+import
org.apache.hadoop.hdds.scm.container.reconciliation.ReconciliationEligibilityHandler.Result;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.ArgumentCaptor;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests that the ReconcileContainerEventHandler properly accepts and rejects
reconciliation events based on
+ * container state, and dispatches commands to datanodes accordingly.
+ */
+public class TestReconcileContainerEventHandler {
+ private ContainerManager containerManager;
+ private EventPublisher eventPublisher;
+ private ReconcileContainerEventHandler eventHandler;
+ private SCMContext scmContext;
+
+ private static final ContainerID CONTAINER_ID = ContainerID.valueOf(123L);
+ private static final long LEADER_TERM = 3L;
+
+ private static final ReplicationConfig RATIS_THREE_REP =
RatisReplicationConfig.getInstance(THREE);
+ private static final ReplicationConfig RATIS_ONE_REP =
RatisReplicationConfig.getInstance(ONE);
+ private static final ReplicationConfig EC_REP = new ECReplicationConfig(3,
2);
+
+ private ArgumentCaptor<CommandForDatanode<ReconcileContainerCommandProto>>
commandCaptor;
+
+ @BeforeEach
+ public void setup() throws Exception {
+ commandCaptor = ArgumentCaptor.forClass(CommandForDatanode.class);
+ containerManager = mock(ContainerManager.class);
+ scmContext = mock(SCMContext.class);
+ when(scmContext.isLeader()).thenReturn(true);
+ when(scmContext.getTermOfLeader()).thenReturn(LEADER_TERM);
+ eventPublisher = mock(EventPublisher.class);
+ eventHandler = new ReconcileContainerEventHandler(containerManager,
scmContext);
+ }
+
+ /**
+ * EC containers are not yet supported for reconciliation.
+ */
+ @Test
+ public void testReconcileECContainer() throws Exception {
+ addContainer(EC_REP, LifeCycleState.CLOSED);
+ addReplicasToContainer(5);
+
+ EligibilityResult result =
+
ReconciliationEligibilityHandler.isEligibleForReconciliation(CONTAINER_ID,
containerManager);
+ assertFalse(result.isOk());
+ assertEquals(Result.INELIGIBLE_REPLICATION_TYPE, result.getResult());
+
+ eventHandler.onMessage(CONTAINER_ID, eventPublisher);
+ verify(eventPublisher, never()).fireEvent(eq(DATANODE_COMMAND), any());
+ }
+
+ /**
+ * Ratis 1 containers are not currently supported for reconciliation.
+ */
+ @Test
+ public void testReconcileRatisOneContainer() throws Exception {
+ addContainer(RATIS_ONE_REP, LifeCycleState.CLOSED);
+ addReplicasToContainer(1);
+
+ EligibilityResult result =
+
ReconciliationEligibilityHandler.isEligibleForReconciliation(CONTAINER_ID,
containerManager);
+ assertFalse(result.isOk());
+ assertEquals(Result.NOT_ENOUGH_REQUIRED_NODES, result.getResult());
+
+ eventHandler.onMessage(CONTAINER_ID, eventPublisher);
+ verify(eventPublisher, never()).fireEvent(eq(DATANODE_COMMAND), any());
+ }
+
+ @Test
+ public void testReconcileWhenNotLeader() throws Exception {
+ addContainer(RATIS_THREE_REP, LifeCycleState.CLOSED);
+ addReplicasToContainer(3);
+ when(scmContext.isLeader()).thenReturn(false);
+
+ // Container is eligible for reconciliation, but the request will not go
through because this SCM is not the leader.
+ EligibilityResult result =
+
ReconciliationEligibilityHandler.isEligibleForReconciliation(CONTAINER_ID,
containerManager);
+ assertTrue(result.isOk());
+ assertEquals(Result.OK, result.getResult());
+
+ eventHandler.onMessage(CONTAINER_ID, eventPublisher);
+ verify(eventPublisher, never()).fireEvent(eq(DATANODE_COMMAND), any());
+ }
+
+ @Test
+ public void testReconcileNonexistentContainer() throws Exception {
+ // The step of adding the container to the mocked ContainerManager is
intentionally skipped to simulate a
+ // nonexistent container.
+ // No exceptions should be thrown out of this test method when this
happens. If they are, they will be propagated
+ // and the test will fail.
+ when(containerManager.getContainer(any())).thenThrow(new
ContainerNotFoundException());
+
+ EligibilityResult result =
+
ReconciliationEligibilityHandler.isEligibleForReconciliation(CONTAINER_ID,
containerManager);
+ assertFalse(result.isOk());
+ assertEquals(Result.CONTAINER_NOT_FOUND, result.getResult());
+
+ eventHandler.onMessage(CONTAINER_ID, eventPublisher);
+ verify(eventPublisher, never()).fireEvent(eq(DATANODE_COMMAND), any());
+ }
+
+ @Test
+ public void testReconcileMissingContainer() throws Exception {
+ addContainer(RATIS_THREE_REP, LifeCycleState.CLOSED);
+ assertTrue(containerManager.getContainerReplicas(CONTAINER_ID).isEmpty(),
+ "Expected no replicas for this container");
+
+ EligibilityResult result =
+
ReconciliationEligibilityHandler.isEligibleForReconciliation(CONTAINER_ID,
containerManager);
+ assertFalse(result.isOk());
+ assertEquals(Result.NO_REPLICAS_FOUND, result.getResult());
+
+ eventHandler.onMessage(CONTAINER_ID, eventPublisher);
+ verify(eventPublisher, never()).fireEvent(eq(DATANODE_COMMAND), any());
+ }
+
+ @ParameterizedTest
+ @EnumSource(LifeCycleState.class)
+ public void testReconcileWithContainerStates(LifeCycleState state) throws
Exception {
+ addContainer(RATIS_THREE_REP, state);
+ addReplicasToContainer(3);
+ EligibilityResult result =
+
ReconciliationEligibilityHandler.isEligibleForReconciliation(CONTAINER_ID,
containerManager);
+ eventHandler.onMessage(CONTAINER_ID, eventPublisher);
+ switch (state) {
+ case OPEN:
+ case CLOSING:
+ case DELETING:
+ case DELETED:
+ case RECOVERING:
+ assertFalse(result.isOk());
+ assertEquals(Result.INELIGIBLE_CONTAINER_STATE, result.getResult());
+ verify(eventPublisher, never()).fireEvent(eq(DATANODE_COMMAND),
commandCaptor.capture());
+ break;
+ default:
+ assertTrue(result.isOk());
+ assertEquals(Result.OK, result.getResult());
+ verify(eventPublisher, times(3)).fireEvent(eq(DATANODE_COMMAND),
commandCaptor.capture());
+ break;
+ }
+ }
+
+ // TODO HDDS-10714 will change which datanodes are eligible to participate
in reconciliation.
+ @Test
+ public void testReconcileSentToAllPeers() throws Exception {
+ addContainer(RATIS_THREE_REP, LifeCycleState.CLOSED);
+ Set<ContainerReplica> replicas = addReplicasToContainer(3);
+ Set<UUID> allNodeIDs = replicas.stream()
+ .map(r -> r.getDatanodeDetails().getUuid())
+ .collect(Collectors.toSet());
+
+ EligibilityResult result =
+
ReconciliationEligibilityHandler.isEligibleForReconciliation(CONTAINER_ID,
containerManager);
+ assertTrue(result.isOk());
+ assertEquals(Result.OK, result.getResult());
+
+ eventHandler.onMessage(CONTAINER_ID, eventPublisher);
+ assertEquals(3, replicas.size());
+ assertEquals(allNodeIDs.size(), replicas.size());
+ verify(eventPublisher,
times(replicas.size())).fireEvent(eq(DATANODE_COMMAND),
commandCaptor.capture());
+
+ // Check each reconcile command sent for correctness.
+ Set<UUID> nodesReceivingCommands = new HashSet<>();
+ for (CommandForDatanode<ReconcileContainerCommandProto> dnCommand:
commandCaptor.getAllValues()) {
+ SCMCommand<ReconcileContainerCommandProto> reconcileCommand =
dnCommand.getCommand();
+ ReconcileContainerCommandProto reconcileProto =
reconcileCommand.getProto();
+ // All commands should use the latest term of SCM so the datanode does
not drop them.
+ assertEquals(LEADER_TERM, reconcileCommand.getTerm());
+ // All commands should have the same container ID.
+ assertEquals(CONTAINER_ID,
ContainerID.valueOf(reconcileProto.getContainerID()));
+ // Container ID is also used as the command's identifier.
+ assertEquals(CONTAINER_ID,
ContainerID.valueOf(reconcileCommand.getId()));
+
+ // Every node should receive exactly one reconcile command.
+ UUID targetNodeID = dnCommand.getDatanodeId();
+ assertTrue(nodesReceivingCommands.add(targetNodeID), "Duplicate
reconcile command sent to datanode.");
+ // All commands should have correctly constructed peer lists that
exclude the node receiving the command.
+ Set<UUID> expectedPeerIDs = allNodeIDs.stream()
+ .filter(id -> id != targetNodeID)
+ .collect(Collectors.toSet());
+ Set<UUID> actualPeerIDs = reconcileProto.getPeersList().stream()
+ .map(dn -> UUID.fromString(dn.getUuid()))
+ .collect(Collectors.toSet());
+ assertEquals(replicas.size() - 1, actualPeerIDs.size());
+ assertEquals(expectedPeerIDs, actualPeerIDs);
+ }
+
+ assertEquals(allNodeIDs, nodesReceivingCommands);
+ }
+
+ @ParameterizedTest
+ @EnumSource(State.class)
+ public void testReconcileFailsWithIneligibleReplicas(State replicaState)
throws Exception {
+ // Overall container state is eligible for reconciliation, but some
replicas may not be.
+ // This means the container will not be considered eligible.
+ addContainer(RATIS_THREE_REP, LifeCycleState.CLOSED);
+ // Only one replica is in a different state.
+ addReplicasToContainer(replicaState, State.CLOSED, State.CLOSED);
+
+ EligibilityResult result =
+
ReconciliationEligibilityHandler.isEligibleForReconciliation(CONTAINER_ID,
containerManager);
+
+ eventHandler.onMessage(CONTAINER_ID, eventPublisher);
+ switch (replicaState) {
+ case OPEN:
+ case INVALID:
+ case DELETED:
+ case CLOSING:
+ assertFalse(result.isOk());
+ assertEquals(Result.INELIGIBLE_REPLICA_STATES, result.getResult());
+ verify(eventPublisher, never()).fireEvent(eq(DATANODE_COMMAND),
commandCaptor.capture());
+ break;
+ default:
+ assertTrue(result.isOk());
+ assertEquals(Result.OK, result.getResult());
+ verify(eventPublisher, times(3)).fireEvent(eq(DATANODE_COMMAND),
commandCaptor.capture());
+ break;
+ }
+ }
+
+ private ContainerInfo addContainer(ReplicationConfig repConfig,
LifeCycleState state) throws Exception {
+ ContainerInfo container = new ContainerInfo.Builder()
+ .setContainerID(CONTAINER_ID.getId())
+ .setReplicationConfig(repConfig)
+ .setState(state)
+ .build();
+ when(containerManager.getContainer(CONTAINER_ID)).thenReturn(container);
+ return container;
+ }
+
+ private Set<ContainerReplica> addReplicasToContainer(int count) throws
Exception {
+ State[] replicaStates = new State[count];
+ Arrays.fill(replicaStates, State.CLOSED);
+ return addReplicasToContainer(replicaStates);
+ }
+
+ private Set<ContainerReplica> addReplicasToContainer(State... replicaStates)
throws Exception {
+ // Add one container replica for each replica state specified.
+ // If no states are specified, replica list will be empty.
+ Set<ContainerReplica> replicas = new HashSet<>();
+ try (MockNodeManager nodeManager = new MockNodeManager(true,
replicaStates.length)) {
+ List<DatanodeDetails> nodes = nodeManager.getAllNodes();
+ for (int i = 0; i < replicaStates.length; i++) {
+ replicas.addAll(HddsTestUtils.getReplicas(CONTAINER_ID,
replicaStates[i], nodes.get(i)));
+ }
+ }
+
when(containerManager.getContainerReplicas(CONTAINER_ID)).thenReturn(replicas);
+
+ return replicas;
+ }
+}
diff --git
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
index 0dd52cd291..f46ee62fd8 100644
---
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
+++
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
@@ -575,4 +575,8 @@ public class ContainerOperationClient implements ScmClient {
return storageContainerLocationClient.getMetrics(query);
}
+ @Override
+ public void reconcileContainer(long id) throws IOException {
+ storageContainerLocationClient.reconcileContainer(id);
+ }
}
diff --git
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ContainerCommands.java
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ContainerCommands.java
index 54c69273f0..9f93c56f2d 100644
---
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ContainerCommands.java
+++
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ContainerCommands.java
@@ -43,7 +43,8 @@ import picocli.CommandLine.Spec;
CreateSubcommand.class,
CloseSubcommand.class,
ReportSubcommand.class,
- UpgradeSubcommand.class
+ UpgradeSubcommand.class,
+ ReconcileSubcommand.class
})
@MetaInfServices(SubcommandWithParent.class)
public class ContainerCommands implements Callable<Void>, SubcommandWithParent
{
diff --git
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ReconcileSubcommand.java
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ReconcileSubcommand.java
new file mode 100644
index 0000000000..e747455a88
--- /dev/null
+++
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ReconcileSubcommand.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli.container;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.scm.cli.ScmSubcommand;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+
+import picocli.CommandLine;
+import picocli.CommandLine.Command;
+
+/**
+ * This is the handler that process container list command.
+ */
+@Command(
+ name = "reconcile",
+ description = "Reconcile container replicas",
+ mixinStandardHelpOptions = true,
+ versionProvider = HddsVersionProvider.class)
+public class ReconcileSubcommand extends ScmSubcommand {
+
+ @CommandLine.Parameters(description = "ID of the container to reconcile")
+ private long containerId;
+
+ @Override
+ public void execute(ScmClient scmClient) throws IOException {
+ scmClient.reconcileContainer(containerId);
+ System.out.println("Reconciliation has been triggered for container " +
containerId);
+ // TODO a better option to check status may be added later.
+ System.out.println("Use \"ozone admin container info --json " +
containerId + "\" to see the checksums of each " +
+ "container replica");
+ }
+}
diff --git a/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot
b/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot
index c50daa724d..fae0899178 100644
--- a/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot
@@ -34,6 +34,12 @@ Container is closed
${output} = Execute ozone admin container info
"${container}"
Should contain ${output} CLOSED
+Reconciliation complete
+ [arguments] ${container}
+ ${data_checksum} = Execute ozone admin container info
"${container}" --json | jq -r '.replicas[].dataChecksum' | head -n1
+ Should not be empty ${data_checksum}
+ Should not be equal as strings 0 ${data_checksum}
+
*** Test Cases ***
Create container
${output} = Execute ozone admin container create
@@ -71,6 +77,17 @@ Verbose container info
${output} = Execute ozone admin --verbose container info
"${CONTAINER}"
Should contain ${output} Pipeline Info
+Incomplete command
+ ${output} = Execute And Ignore Error ozone admin container
+ Should contain ${output} Incomplete command
+ Should contain ${output} list
+ Should contain ${output} info
+ Should contain ${output} create
+ Should contain ${output} close
+ Should contain ${output} reconcile
+ Should contain ${output} report
+ Should contain ${output} upgrade
+
List containers as JSON
${output} = Execute ozone admin container info
"${CONTAINER}" --json | jq -r '.'
Should contain ${output} containerInfo
@@ -84,23 +101,6 @@ Report containers as JSON
Should contain ${output} stats
Should contain ${output} samples
-Close container
- ${container} = Execute ozone admin container list --state
OPEN | jq -r 'select(.replicationConfig.replicationFactor == "THREE") |
.containerID' | head -1
- Execute ozone admin container close
"${container}"
- ${output} = Execute ozone admin container info
"${container}"
- Should contain ${output} CLOS
- Wait until keyword succeeds 1min 10sec Container is closed
${container}
-
-Incomplete command
- ${output} = Execute And Ignore Error ozone admin container
- Should contain ${output} Incomplete command
- Should contain ${output} list
- Should contain ${output} info
- Should contain ${output} create
- Should contain ${output} close
- Should contain ${output} report
- Should contain ${output} upgrade
-
#List containers on unknown host
# ${output} = Execute And Ignore Error ozone admin --verbose
container list --scm unknown-host
# Should contain ${output} Invalid host name
@@ -111,5 +111,38 @@ Cannot close container without admin privilege
Cannot create container without admin privilege
Requires admin privilege ozone admin container create
+Cannot reconcile container without admin privilege
+ Requires admin privilege ozone admin container reconcile "${CONTAINER}"
+
Reset user
Run Keyword if '${SECURITY_ENABLED}' == 'true' Kinit test user
testuser testuser.keytab
+
+Cannot reconcile open container
+ # At this point we should have an open Ratis Three container.
+ ${container} = Execute ozone admin container list --state
OPEN | jq -r 'select(.replicationConfig.replicationFactor == "THREE") |
.containerID' | head -n1
+ Execute and check rc ozone admin container reconcile "${container}"
255
+ # The container should not yet have any replica checksums.
+ # TODO When the scanner is computing checksums automatically, this test
may need to be updated.
+ ${data_checksum} = Execute ozone admin container info
"${container}" --json | jq -r '.replicas[].dataChecksum' | head -n1
+ # 0 is the hex value of an empty checksum.
+ Should Be Equal As Strings 0 ${data_checksum}
+
+Close container
+ ${container} = Execute ozone admin container list --state
OPEN | jq -r 'select(.replicationConfig.replicationFactor == "THREE") |
.containerID' | head -1
+ Execute ozone admin container close
"${container}"
+ # The container may either be in CLOSED or CLOSING state at this point.
Once we have verified this, we will wait
+ # for it to progress to CLOSED.
+ ${output} = Execute ozone admin container info
"${container}"
+ Should contain ${output} CLOS
+ Wait until keyword succeeds 1min 10sec Container is closed
${container}
+
+Reconcile closed container
+ # Check that info does not show replica checksums, since manual
reconciliation has not yet been triggered.
+ # TODO When the scanner is computing checksums automatically, this test
may need to be updated.
+ ${container} = Execute ozone admin container list --state
CLOSED | jq -r 'select(.replicationConfig.replicationFactor == "THREE") |
.containerID' | head -1
+ ${data_checksum} = Execute ozone admin container info
"${container}" --json | jq -r '.replicas[].dataChecksum' | head -n1
+ # 0 is the hex value of an empty checksum.
+ Should Be Equal As Strings 0 ${data_checksum}
+ # When reconciliation finishes, replica checksums should be shown.
+ Execute ozone admin container reconcile ${container}
+ Wait until keyword succeeds 1min 5sec Reconciliation complete
${container}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]