sumitagrawl commented on code in PR #8845: URL: https://github.com/apache/ozone/pull/8845#discussion_r2228321692
########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java: ########## @@ -247,21 +354,72 @@ public void reinitialize( // before reinitialization. Just update deletedTable here. Preconditions.checkArgument(deletingTxIDs.isEmpty()); this.deletedTable = deletedBlocksTXTable; + this.statefulConfigTable = configTable; } public static Builder newBuilder() { return new Builder(); } + @Override + public DeletedBlocksTransactionSummary getTransactionSummary() { + return DeletedBlocksTransactionSummary.newBuilder() + .setFirstTxID(firstTxIdForDataDistribution) + .setTotalTransactionCount(totalTxCount.get()) + .setTotalBlockCount(totalBlockCount.get()) + .setTotalBlockSize(totalBlocksSize.get()) + .setTotalBlockReplicatedSize(totalReplicatedBlocksSize.get()) + .build(); + } + + private void initDataDistributionData() throws IOException { + if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.DATA_DISTRIBUTION)) { Review Comment: we can keep populating memory information, no need feature flag as there is no impact. ########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/ScmOnFinalizeActionForDataDistribution.java: ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.server.upgrade; + +import static org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature.DATA_DISTRIBUTION; +import static org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.ON_FINALIZE; +import static org.apache.hadoop.ozone.upgrade.UpgradeActionHdds.Component.SCM; + +import org.apache.hadoop.hdds.upgrade.HDDSUpgradeAction; +import org.apache.hadoop.ozone.upgrade.UpgradeActionHdds; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * SCM Upgrade Action for data distribution. + */ +@UpgradeActionHdds(feature = DATA_DISTRIBUTION, component = SCM, type = + ON_FINALIZE) +public class ScmOnFinalizeActionForDataDistribution implements + HDDSUpgradeAction<SCMUpgradeFinalizationContext> { + private static final Logger LOG = + LoggerFactory.getLogger(ScmOnFinalizeActionForDataDistribution.class); + + @Override + public void execute(SCMUpgradeFinalizationContext context) throws Exception { + // Action is executed on every SCM, not only leader + LOG.info("Executing SCM On Finalize action for layout feature {}", Review Comment: We need not define Action handler if there is no action. ########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java: ########## @@ -154,21 +187,95 @@ public void addTransactionsToDB(ArrayList<DeletedBlocksTransaction> txs) containerIdToTxnIdMap.compute(ContainerID.valueOf(tx.getContainerID()), (k, v) -> v != null && v > tid ? v : tid); transactionBuffer.addToBuffer(deletedTable, tx.getTxID(), tx); + if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.DATA_DISTRIBUTION) && + tx.hasTotalBlockReplicatedSize()) { + if (!isFirstTxIdForDataDistributionSet) { + // set the first transaction ID for data distribution + isFirstTxIdForDataDistributionSet = true; + firstTxIdForDataDistribution = tx.getTxID(); + } + transactionBuffer.addToBuffer(statefulConfigTable, SERVICE_NAME, + incrDeletedBlocksSummary(tx).toByteString()); + } } + containerManager.updateDeleteTransactionId(containerIdToTxnIdMap); } + private DeletedBlocksTransactionSummary incrDeletedBlocksSummary(DeletedBlocksTransaction tx) { + totalTxCount.addAndGet(1); + totalBlockCount.addAndGet(tx.getLocalIDCount()); + totalBlocksSize.addAndGet(tx.getTotalBlockSize()); + totalReplicatedBlocksSize.addAndGet(tx.getTotalBlockReplicatedSize()); + return DeletedBlocksTransactionSummary.newBuilder() + .setFirstTxID(firstTxIdForDataDistribution) + .setTotalTransactionCount(totalTxCount.get()) + .setTotalBlockCount(totalBlockCount.get()) + .setTotalBlockSize(totalBlocksSize.get()) + .setTotalBlockReplicatedSize(totalReplicatedBlocksSize.get()) + .build(); + } + @Override public void removeTransactionsFromDB(ArrayList<Long> txIDs) throws IOException { if (deletingTxIDs != null) { deletingTxIDs.addAll(txIDs); } + for (Long txID : txIDs) { transactionBuffer.removeFromBuffer(deletedTable, txID); + if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.DATA_DISTRIBUTION) && + txID >= firstTxIdForDataDistribution) { + DeletedBlockLogImpl.TxBlockInfo txBlockInfo = txBlockInfoMap.remove(txID); + if (txBlockInfo != null) { + transactionBuffer.addToBuffer(statefulConfigTable, SERVICE_NAME, + descDeletedBlocksSummary(txBlockInfo).toByteString()); + metrics.incrBlockDeletionTransactionSizeFromCache(); + } else { + // Fetch the transaction from DB to get the size. This happens during + // 1. SCM leader transfer, deletion command send by one SCM, + // while the deletion ack received by a different SCM + // 2. SCM restarts, txBlockInfoMap is empty, while receiving the deletion ack from DN + DeletedBlocksTransaction tx = deletedTable.get(txID); + if (tx != null && tx.hasTotalBlockReplicatedSize()) { + transactionBuffer.addToBuffer(statefulConfigTable, SERVICE_NAME, + descDeletedBlocksSummary(tx).toByteString()); + } + metrics.incrBlockDeletionTransactionSizeFromDB(); Review Comment: when there is switchover, the delete block will be tried again, as these information will not be there in follower for txCommitMap. So this response will be just ignored. But this metric may give wrong information / confusion if persisting this to DB. May be it should persist only those handled by the respective SCM as local update, exclusive handled by that SCM. Additional point, how summary is sync accross SCM ? if using Stateful service as can see, how the consistency is maintained for above case ? ########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java: ########## @@ -247,21 +354,72 @@ public void reinitialize( // before reinitialization. Just update deletedTable here. Preconditions.checkArgument(deletingTxIDs.isEmpty()); this.deletedTable = deletedBlocksTXTable; + this.statefulConfigTable = configTable; } public static Builder newBuilder() { return new Builder(); } + @Override + public DeletedBlocksTransactionSummary getTransactionSummary() { + return DeletedBlocksTransactionSummary.newBuilder() + .setFirstTxID(firstTxIdForDataDistribution) + .setTotalTransactionCount(totalTxCount.get()) + .setTotalBlockCount(totalBlockCount.get()) + .setTotalBlockSize(totalBlocksSize.get()) + .setTotalBlockReplicatedSize(totalReplicatedBlocksSize.get()) + .build(); + } + + private void initDataDistributionData() throws IOException { + if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.DATA_DISTRIBUTION)) { + DeletedBlocksTransactionSummary summary = loadDeletedBlocksSummary(); + if (summary != null) { + isFirstTxIdForDataDistributionSet = true; + firstTxIdForDataDistribution = summary.getFirstTxID(); + totalTxCount.set(summary.getTotalTransactionCount()); + totalBlockCount.set(summary.getTotalBlockCount()); + totalBlocksSize.set(summary.getTotalBlockSize()); + totalReplicatedBlocksSize.set(summary.getTotalBlockReplicatedSize()); + LOG.info("Data distribution is enabled with totalBlockCount {} totalBlocksSize {} lastTxIdBeforeUpgrade {}", + totalBlockCount.get(), totalBlocksSize.get(), firstTxIdForDataDistribution); + } + } else { + LOG.info(HDDSLayoutFeature.DATA_DISTRIBUTION + " is not finalized"); + } + } + + private DeletedBlocksTransactionSummary loadDeletedBlocksSummary() throws IOException { + try { + ByteString byteString = statefulConfigTable.get(SERVICE_NAME); + if (byteString == null) { + // for a new Ozone cluster, property not found is an expected state. + LOG.info("Property {} for service {} not found. ", + DeletedBlocksTransactionSummary.class.getSimpleName(), SERVICE_NAME); + return null; + } + return DeletedBlocksTransactionSummary.class.cast(ReflectionUtil.getMethod( Review Comment: its trying to use DeletedBlocksTransactionSummary proto-parser to serialize/deserialize, can we define Code to do same instead using cast ? ########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java: ########## @@ -229,13 +246,23 @@ private int resetRetryCount(List<Long> txIDs) throws IOException { } private DeletedBlocksTransaction constructNewTransaction( - long txID, long containerID, List<Long> blocks) { - return DeletedBlocksTransaction.newBuilder() + long txID, long containerID, List<DeletedBlock> blocks) { + List<Long> localIdList = blocks.stream().map(b -> b.getBlockID().getLocalID()).collect(Collectors.toList()); + DeletedBlocksTransaction.Builder builder = DeletedBlocksTransaction.newBuilder() .setTxID(txID) .setContainerID(containerID) - .addAllLocalID(blocks) - .setCount(0) - .build(); + .addAllLocalID(localIdList) + .setCount(0); + + if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.DATA_DISTRIBUTION)) { + long replicatedSize = blocks.stream().mapToLong(DeletedBlock::getReplicatedSize).sum(); + // even when HDDSLayoutFeature.DATA_DISTRIBUTION is finalized, old OM can still call the old API + if (replicatedSize >= 0) { + builder.setTotalBlockReplicatedSize(replicatedSize); + builder.setTotalBlockSize(blocks.stream().mapToLong(DeletedBlock::getSize).sum()); Review Comment: if initialized with -1, then this will give wrong value. Better to keep 0. This can happen if receives request before SCM finalize by OM service. ########## hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto: ########## @@ -368,6 +368,8 @@ message DeletedBlocksTransaction { repeated int64 localID = 3; // the retry time of sending deleting command to datanode. required int32 count = 4; + optional uint64 totalBlockSize = 5; Review Comment: what will be default value for old transaction in table? when its read to populate while sending to DN in getTransactions() ########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java: ########## @@ -1004,6 +1006,23 @@ public int resetDeletedBlockRetryCount(List<Long> txIDs) throws IOException { } } + @Nullable + @Override + public DeletedBlocksTransactionSummary getDeletedBlockSummary() { + final Map<String, String> auditMap = Maps.newHashMap(); + try { + DeletedBlocksTransactionSummary summary = + scm.getScmBlockManager().getDeletedBlockLog().getTransactionSummary(); + AUDIT.logWriteSuccess(buildAuditMessageForSuccess( Review Comment: this needs to be logReadSuccess() or logReadFailure() ########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java: ########## @@ -439,6 +468,11 @@ public DatanodeDeletedBlockTransactions getTransactions( keyValue = iter.next(); DeletedBlocksTransaction txn = keyValue.getValue(); final ContainerID id = ContainerID.valueOf(txn.getContainerID()); + if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.DATA_DISTRIBUTION) && Review Comment: Since this is just memory, instead of rebuild on finalize(), we can keep build, and no need check isFinalized() here ########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java: ########## @@ -439,6 +468,11 @@ public DatanodeDeletedBlockTransactions getTransactions( keyValue = iter.next(); DeletedBlocksTransaction txn = keyValue.getValue(); final ContainerID id = ContainerID.valueOf(txn.getContainerID()); + if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.DATA_DISTRIBUTION) && + txn.hasTotalBlockReplicatedSize()) { + txSizeMap.put(txn.getTxID(), Review Comment: We can move this population for transaction getting send to DN, in getTransaction() where transactions is populated, to be removed -- can not find any action for removal ########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/ScmBlockDeletingServiceMetrics.java: ########## @@ -272,6 +315,19 @@ public void getMetrics(MetricsCollector metricsCollector, boolean all) { .addGauge(DatanodeCommandDetails.BLOCKS_SENT_TO_DN_COMMAND, e.getValue().getBlocksSent()); } + + // add metrics for deleted block transaction summary + if (blockManager.getDeletedBlockLog().isTransactionSummarySupported()) { Review Comment: we do not need feature flag, to be added for cases having impact in rollback, like, 1. saving to db 2. communication to other service Metrics are fine to be recorded ########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java: ########## @@ -439,6 +468,11 @@ public DatanodeDeletedBlockTransactions getTransactions( keyValue = iter.next(); DeletedBlocksTransaction txn = keyValue.getValue(); final ContainerID id = ContainerID.valueOf(txn.getContainerID()); + if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.DATA_DISTRIBUTION) && + txn.hasTotalBlockReplicatedSize()) { + txSizeMap.put(txn.getTxID(), Review Comment: Additionally, all tracking is moved to org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager class for transaction after refactoring. May be this also can be moved to same class to be consistent. Like transactionToDNsCommitMap, transactionToRetryCountMap ########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java: ########## @@ -439,6 +468,11 @@ public DatanodeDeletedBlockTransactions getTransactions( keyValue = iter.next(); DeletedBlocksTransaction txn = keyValue.getValue(); final ContainerID id = ContainerID.valueOf(txn.getContainerID()); + if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.DATA_DISTRIBUTION) && + txn.hasTotalBlockReplicatedSize()) { + txSizeMap.put(txn.getTxID(), Review Comment: Cleanup of this map can be done on various event like Timeout, dn-dead, or others, at common place -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For additional commands, e-mail: issues-h...@ozone.apache.org