sumitagrawl commented on code in PR #8845:
URL: https://github.com/apache/ozone/pull/8845#discussion_r2228321692


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java:
##########
@@ -247,21 +354,72 @@ public void reinitialize(
     // before reinitialization. Just update deletedTable here.
     Preconditions.checkArgument(deletingTxIDs.isEmpty());
     this.deletedTable = deletedBlocksTXTable;
+    this.statefulConfigTable = configTable;
   }
 
   public static Builder newBuilder() {
     return new Builder();
   }
 
+  @Override
+  public DeletedBlocksTransactionSummary getTransactionSummary() {
+    return DeletedBlocksTransactionSummary.newBuilder()
+        .setFirstTxID(firstTxIdForDataDistribution)
+        .setTotalTransactionCount(totalTxCount.get())
+        .setTotalBlockCount(totalBlockCount.get())
+        .setTotalBlockSize(totalBlocksSize.get())
+        .setTotalBlockReplicatedSize(totalReplicatedBlocksSize.get())
+        .build();
+  }
+
+  private void initDataDistributionData() throws IOException {
+    if 
(VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.DATA_DISTRIBUTION)) {

Review Comment:
   we can keep populating memory information, no need feature flag as there is 
no impact.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/ScmOnFinalizeActionForDataDistribution.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.server.upgrade;
+
+import static 
org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature.DATA_DISTRIBUTION;
+import static 
org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.ON_FINALIZE;
+import static org.apache.hadoop.ozone.upgrade.UpgradeActionHdds.Component.SCM;
+
+import org.apache.hadoop.hdds.upgrade.HDDSUpgradeAction;
+import org.apache.hadoop.ozone.upgrade.UpgradeActionHdds;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SCM Upgrade Action for data distribution.
+ */
+@UpgradeActionHdds(feature = DATA_DISTRIBUTION, component = SCM, type =
+    ON_FINALIZE)
+public class ScmOnFinalizeActionForDataDistribution implements
+    HDDSUpgradeAction<SCMUpgradeFinalizationContext> {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ScmOnFinalizeActionForDataDistribution.class);
+
+  @Override
+  public void execute(SCMUpgradeFinalizationContext context) throws Exception {
+    // Action is executed on every SCM, not only leader
+    LOG.info("Executing SCM On Finalize action for layout feature {}",

Review Comment:
   We need not define Action handler if there is no action.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java:
##########
@@ -154,21 +187,95 @@ public void 
addTransactionsToDB(ArrayList<DeletedBlocksTransaction> txs)
       containerIdToTxnIdMap.compute(ContainerID.valueOf(tx.getContainerID()),
           (k, v) -> v != null && v > tid ? v : tid);
       transactionBuffer.addToBuffer(deletedTable, tx.getTxID(), tx);
+      if 
(VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.DATA_DISTRIBUTION) &&
+          tx.hasTotalBlockReplicatedSize()) {
+        if (!isFirstTxIdForDataDistributionSet) {
+          // set the first transaction ID for data distribution
+          isFirstTxIdForDataDistributionSet = true;
+          firstTxIdForDataDistribution = tx.getTxID();
+        }
+        transactionBuffer.addToBuffer(statefulConfigTable, SERVICE_NAME,
+            incrDeletedBlocksSummary(tx).toByteString());
+      }
     }
+
     containerManager.updateDeleteTransactionId(containerIdToTxnIdMap);
   }
 
+  private DeletedBlocksTransactionSummary 
incrDeletedBlocksSummary(DeletedBlocksTransaction tx) {
+    totalTxCount.addAndGet(1);
+    totalBlockCount.addAndGet(tx.getLocalIDCount());
+    totalBlocksSize.addAndGet(tx.getTotalBlockSize());
+    totalReplicatedBlocksSize.addAndGet(tx.getTotalBlockReplicatedSize());
+    return DeletedBlocksTransactionSummary.newBuilder()
+        .setFirstTxID(firstTxIdForDataDistribution)
+        .setTotalTransactionCount(totalTxCount.get())
+        .setTotalBlockCount(totalBlockCount.get())
+        .setTotalBlockSize(totalBlocksSize.get())
+        .setTotalBlockReplicatedSize(totalReplicatedBlocksSize.get())
+        .build();
+  }
+
   @Override
   public void removeTransactionsFromDB(ArrayList<Long> txIDs)
       throws IOException {
     if (deletingTxIDs != null) {
       deletingTxIDs.addAll(txIDs);
     }
+
     for (Long txID : txIDs) {
       transactionBuffer.removeFromBuffer(deletedTable, txID);
+      if 
(VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.DATA_DISTRIBUTION) &&
+          txID >= firstTxIdForDataDistribution) {
+        DeletedBlockLogImpl.TxBlockInfo txBlockInfo = 
txBlockInfoMap.remove(txID);
+        if (txBlockInfo != null) {
+          transactionBuffer.addToBuffer(statefulConfigTable, SERVICE_NAME,
+              descDeletedBlocksSummary(txBlockInfo).toByteString());
+          metrics.incrBlockDeletionTransactionSizeFromCache();
+        } else {
+          // Fetch the transaction from DB to get the size. This happens during
+          // 1. SCM leader transfer, deletion command send by one SCM,
+          //    while the deletion ack received by a different SCM
+          // 2. SCM restarts, txBlockInfoMap is empty, while receiving the 
deletion ack from DN
+          DeletedBlocksTransaction tx = deletedTable.get(txID);
+          if (tx != null && tx.hasTotalBlockReplicatedSize()) {
+            transactionBuffer.addToBuffer(statefulConfigTable, SERVICE_NAME,
+                descDeletedBlocksSummary(tx).toByteString());
+          }
+          metrics.incrBlockDeletionTransactionSizeFromDB();

Review Comment:
   when there is switchover, the delete block will be tried again, as these 
information will not be there in follower for txCommitMap.
   So this response will be just ignored. But this metric may give wrong 
information / confusion if persisting this to DB. May be it should persist only 
those handled by the respective SCM as local update, exclusive handled by that 
SCM.
   
   Additional point, how summary is sync accross SCM ? if using Stateful 
service as can see, how the consistency is maintained for above case ?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java:
##########
@@ -247,21 +354,72 @@ public void reinitialize(
     // before reinitialization. Just update deletedTable here.
     Preconditions.checkArgument(deletingTxIDs.isEmpty());
     this.deletedTable = deletedBlocksTXTable;
+    this.statefulConfigTable = configTable;
   }
 
   public static Builder newBuilder() {
     return new Builder();
   }
 
+  @Override
+  public DeletedBlocksTransactionSummary getTransactionSummary() {
+    return DeletedBlocksTransactionSummary.newBuilder()
+        .setFirstTxID(firstTxIdForDataDistribution)
+        .setTotalTransactionCount(totalTxCount.get())
+        .setTotalBlockCount(totalBlockCount.get())
+        .setTotalBlockSize(totalBlocksSize.get())
+        .setTotalBlockReplicatedSize(totalReplicatedBlocksSize.get())
+        .build();
+  }
+
+  private void initDataDistributionData() throws IOException {
+    if 
(VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.DATA_DISTRIBUTION)) {
+      DeletedBlocksTransactionSummary summary = loadDeletedBlocksSummary();
+      if (summary != null) {
+        isFirstTxIdForDataDistributionSet = true;
+        firstTxIdForDataDistribution = summary.getFirstTxID();
+        totalTxCount.set(summary.getTotalTransactionCount());
+        totalBlockCount.set(summary.getTotalBlockCount());
+        totalBlocksSize.set(summary.getTotalBlockSize());
+        totalReplicatedBlocksSize.set(summary.getTotalBlockReplicatedSize());
+        LOG.info("Data distribution is enabled with totalBlockCount {} 
totalBlocksSize {} lastTxIdBeforeUpgrade {}",
+            totalBlockCount.get(), totalBlocksSize.get(), 
firstTxIdForDataDistribution);
+      }
+    } else {
+      LOG.info(HDDSLayoutFeature.DATA_DISTRIBUTION + " is not finalized");
+    }
+  }
+
+  private DeletedBlocksTransactionSummary loadDeletedBlocksSummary() throws 
IOException {
+    try {
+      ByteString byteString = statefulConfigTable.get(SERVICE_NAME);
+      if (byteString == null) {
+        // for a new Ozone cluster, property not found is an expected state.
+        LOG.info("Property {} for service {} not found. ",
+            DeletedBlocksTransactionSummary.class.getSimpleName(), 
SERVICE_NAME);
+        return null;
+      }
+      return 
DeletedBlocksTransactionSummary.class.cast(ReflectionUtil.getMethod(

Review Comment:
   its trying to use DeletedBlocksTransactionSummary  proto-parser to 
serialize/deserialize, can we define Code to do same instead using cast ?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java:
##########
@@ -229,13 +246,23 @@ private int resetRetryCount(List<Long> txIDs) throws 
IOException {
   }
 
   private DeletedBlocksTransaction constructNewTransaction(
-      long txID, long containerID, List<Long> blocks) {
-    return DeletedBlocksTransaction.newBuilder()
+      long txID, long containerID, List<DeletedBlock> blocks) {
+    List<Long> localIdList = blocks.stream().map(b -> 
b.getBlockID().getLocalID()).collect(Collectors.toList());
+    DeletedBlocksTransaction.Builder builder = 
DeletedBlocksTransaction.newBuilder()
         .setTxID(txID)
         .setContainerID(containerID)
-        .addAllLocalID(blocks)
-        .setCount(0)
-        .build();
+        .addAllLocalID(localIdList)
+        .setCount(0);
+
+    if 
(VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.DATA_DISTRIBUTION)) {
+      long replicatedSize = 
blocks.stream().mapToLong(DeletedBlock::getReplicatedSize).sum();
+      // even when HDDSLayoutFeature.DATA_DISTRIBUTION is finalized, old OM 
can still call the old API
+      if (replicatedSize >= 0) {
+        builder.setTotalBlockReplicatedSize(replicatedSize);
+        
builder.setTotalBlockSize(blocks.stream().mapToLong(DeletedBlock::getSize).sum());

Review Comment:
   if initialized with -1, then this will give wrong value. Better to keep 0. 
This can happen if receives request before SCM finalize by OM service.



##########
hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto:
##########
@@ -368,6 +368,8 @@ message DeletedBlocksTransaction {
   repeated int64 localID = 3;
   // the retry time of sending deleting command to datanode.
   required int32 count = 4;
+  optional uint64 totalBlockSize = 5;

Review Comment:
   what will be default value for old transaction in table? when its read to 
populate while sending to DN in getTransactions()



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java:
##########
@@ -1004,6 +1006,23 @@ public int resetDeletedBlockRetryCount(List<Long> txIDs) 
throws IOException {
     }
   }
 
+  @Nullable
+  @Override
+  public DeletedBlocksTransactionSummary getDeletedBlockSummary() {
+    final Map<String, String> auditMap = Maps.newHashMap();
+    try {
+      DeletedBlocksTransactionSummary summary =
+          
scm.getScmBlockManager().getDeletedBlockLog().getTransactionSummary();
+      AUDIT.logWriteSuccess(buildAuditMessageForSuccess(

Review Comment:
   this needs to be logReadSuccess() or logReadFailure()



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java:
##########
@@ -439,6 +468,11 @@ public DatanodeDeletedBlockTransactions getTransactions(
           keyValue = iter.next();
           DeletedBlocksTransaction txn = keyValue.getValue();
           final ContainerID id = ContainerID.valueOf(txn.getContainerID());
+          if 
(VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.DATA_DISTRIBUTION) &&

Review Comment:
   Since this is just memory, instead of rebuild on finalize(), we can keep 
build, and no need check isFinalized() here



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java:
##########
@@ -439,6 +468,11 @@ public DatanodeDeletedBlockTransactions getTransactions(
           keyValue = iter.next();
           DeletedBlocksTransaction txn = keyValue.getValue();
           final ContainerID id = ContainerID.valueOf(txn.getContainerID());
+          if 
(VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.DATA_DISTRIBUTION) &&
+              txn.hasTotalBlockReplicatedSize()) {
+            txSizeMap.put(txn.getTxID(),

Review Comment:
   We can move this population for transaction getting send to DN, in 
getTransaction() where transactions is populated, to be removed -- can not find 
any action for removal



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/ScmBlockDeletingServiceMetrics.java:
##########
@@ -272,6 +315,19 @@ public void getMetrics(MetricsCollector metricsCollector, 
boolean all) {
           .addGauge(DatanodeCommandDetails.BLOCKS_SENT_TO_DN_COMMAND,
           e.getValue().getBlocksSent());
     }
+
+    // add metrics for deleted block transaction summary
+    if (blockManager.getDeletedBlockLog().isTransactionSummarySupported()) {

Review Comment:
   we do not need feature flag, to be added for cases having impact in 
rollback, like,
   1. saving to db
   2. communication to other service
   Metrics are fine to be recorded



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java:
##########
@@ -439,6 +468,11 @@ public DatanodeDeletedBlockTransactions getTransactions(
           keyValue = iter.next();
           DeletedBlocksTransaction txn = keyValue.getValue();
           final ContainerID id = ContainerID.valueOf(txn.getContainerID());
+          if 
(VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.DATA_DISTRIBUTION) &&
+              txn.hasTotalBlockReplicatedSize()) {
+            txSizeMap.put(txn.getTxID(),

Review Comment:
   Additionally, all tracking is moved to 
org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager class 
for transaction after refactoring. May be this also can be moved to same class 
to be consistent.
   Like transactionToDNsCommitMap, transactionToRetryCountMap 



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java:
##########
@@ -439,6 +468,11 @@ public DatanodeDeletedBlockTransactions getTransactions(
           keyValue = iter.next();
           DeletedBlocksTransaction txn = keyValue.getValue();
           final ContainerID id = ContainerID.valueOf(txn.getContainerID());
+          if 
(VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.DATA_DISTRIBUTION) &&
+              txn.hasTotalBlockReplicatedSize()) {
+            txSizeMap.put(txn.getTxID(),

Review Comment:
   Cleanup of this map can be done on various event like Timeout, dn-dead, or 
others, at common place



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org
For additional commands, e-mail: issues-h...@ozone.apache.org

Reply via email to