http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java index 22039d1..2552cf5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO; +import java.io.FileNotFoundException; import java.io.IOException; import java.security.GeneralSecurityException; import java.security.PrivilegedExceptionAction; @@ -31,6 +32,7 @@ import java.util.Map; import org.apache.hadoop.crypto.CipherSuite; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.crypto.key.KeyProvider; +import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; import org.apache.hadoop.fs.FileEncryptionInfo; @@ -42,15 +44,22 @@ import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hdfs.XAttrHelper; import org.apache.hadoop.hdfs.protocol.EncryptionZone; +import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus; import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReencryptionInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ZoneEncryptionInfoProto; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp; +import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.FileEdekInfo; import org.apache.hadoop.security.SecurityUtil; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.util.Time; + +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE; import static org.apache.hadoop.util.Time.monotonicNow; /** @@ -216,18 +225,206 @@ final class FSDirEncryptionZoneOp { } } + static void reencryptEncryptionZone(final FSDirectory fsd, + final String zone, final String keyVersionName, + final boolean logRetryCache) throws IOException { + final List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1); + final FSPermissionChecker pc = fsd.getPermissionChecker(); + fsd.writeLock(); + try { + final INodesInPath iip = fsd.resolvePath(pc, zone, DirOp.WRITE); + final XAttr xattr = fsd.ezManager + .reencryptEncryptionZone(iip, keyVersionName); + xAttrs.add(xattr); + } finally { + fsd.writeUnlock(); + } + fsd.getEditLog().logSetXAttrs(zone, xAttrs, logRetryCache); + } + + static void cancelReencryptEncryptionZone(final FSDirectory fsd, + final String zone, final boolean logRetryCache) throws IOException { + final List<XAttr> xattrs; + final FSPermissionChecker pc = fsd.getPermissionChecker(); + fsd.writeLock(); + try { + final INodesInPath iip = fsd.resolvePath(pc, zone, DirOp.WRITE); + xattrs = fsd.ezManager.cancelReencryptEncryptionZone(iip); + } finally { + fsd.writeUnlock(); + } + if (xattrs != null && !xattrs.isEmpty()) { + fsd.getEditLog().logSetXAttrs(zone, xattrs, logRetryCache); + } + } + + static BatchedListEntries<ZoneReencryptionStatus> listReencryptionStatus( + final FSDirectory fsd, final long prevId) + throws IOException { + fsd.readLock(); + try { + return fsd.ezManager.listReencryptionStatus(prevId); + } finally { + fsd.readUnlock(); + } + } + + /** + * Update re-encryption progress (submitted). Caller should + * logSync after calling this, outside of the FSN lock. + * <p> + * The reencryption status is updated during SetXAttrs. + */ + static XAttr updateReencryptionSubmitted(final FSDirectory fsd, + final INodesInPath iip, final String ezKeyVersionName) + throws IOException { + assert fsd.hasWriteLock(); + Preconditions.checkNotNull(ezKeyVersionName, "ezKeyVersionName is null."); + final ZoneEncryptionInfoProto zoneProto = getZoneEncryptionInfoProto(iip); + Preconditions.checkNotNull(zoneProto, "ZoneEncryptionInfoProto is null."); + + final ReencryptionInfoProto newProto = PBHelperClient + .convert(ezKeyVersionName, Time.now(), false, 0, 0, null, null); + final ZoneEncryptionInfoProto newZoneProto = PBHelperClient + .convert(PBHelperClient.convert(zoneProto.getSuite()), + PBHelperClient.convert(zoneProto.getCryptoProtocolVersion()), + zoneProto.getKeyName(), newProto); + + final XAttr xattr = XAttrHelper + .buildXAttr(CRYPTO_XATTR_ENCRYPTION_ZONE, newZoneProto.toByteArray()); + final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1); + xattrs.add(xattr); + FSDirXAttrOp.unprotectedSetXAttrs(fsd, iip, xattrs, + EnumSet.of(XAttrSetFlag.REPLACE)); + return xattr; + } + + /** + * Update re-encryption progress (start, checkpoint). Caller should + * logSync after calling this, outside of the FSN lock. + * <p> + * The reencryption status is updated during SetXAttrs. + * Original reencryption status is passed in to get existing information + * such as ezkeyVersionName and submissionTime. + */ + static XAttr updateReencryptionProgress(final FSDirectory fsd, + final INode zoneNode, final ZoneReencryptionStatus origStatus, + final String lastFile, final long numReencrypted, final long numFailures) + throws IOException { + assert fsd.hasWriteLock(); + Preconditions.checkNotNull(zoneNode, "Zone node is null"); + INodesInPath iip = INodesInPath.fromINode(zoneNode); + final ZoneEncryptionInfoProto zoneProto = getZoneEncryptionInfoProto(iip); + Preconditions.checkNotNull(zoneProto, "ZoneEncryptionInfoProto is null."); + Preconditions.checkNotNull(origStatus, "Null status for " + iip.getPath()); + + final ReencryptionInfoProto newProto = PBHelperClient + .convert(origStatus.getEzKeyVersionName(), + origStatus.getSubmissionTime(), false, + origStatus.getFilesReencrypted() + numReencrypted, + origStatus.getNumReencryptionFailures() + numFailures, null, + lastFile); + + final ZoneEncryptionInfoProto newZoneProto = PBHelperClient + .convert(PBHelperClient.convert(zoneProto.getSuite()), + PBHelperClient.convert(zoneProto.getCryptoProtocolVersion()), + zoneProto.getKeyName(), newProto); + + final XAttr xattr = XAttrHelper + .buildXAttr(CRYPTO_XATTR_ENCRYPTION_ZONE, newZoneProto.toByteArray()); + final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1); + xattrs.add(xattr); + FSDirXAttrOp.unprotectedSetXAttrs(fsd, iip, xattrs, + EnumSet.of(XAttrSetFlag.REPLACE)); + return xattr; + } + + /** + * Log re-encrypt complete (cancel, or 100% re-encrypt) to edits. + * Caller should logSync after calling this, outside of the FSN lock. + * <p> + * Original reencryption status is passed in to get existing information, + * this should include whether it is finished due to cancellation. + * The reencryption status is updated during SetXAttrs for completion time. + */ + static List<XAttr> updateReencryptionFinish(final FSDirectory fsd, + final INodesInPath zoneIIP, final ZoneReencryptionStatus origStatus) + throws IOException { + assert origStatus != null; + assert fsd.hasWriteLock(); + fsd.ezManager.getReencryptionStatus() + .markZoneCompleted(zoneIIP.getLastINode().getId()); + final XAttr xattr = + generateNewXAttrForReencryptionFinish(zoneIIP, origStatus); + final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1); + xattrs.add(xattr); + FSDirXAttrOp.unprotectedSetXAttrs(fsd, zoneIIP, xattrs, + EnumSet.of(XAttrSetFlag.REPLACE)); + return xattrs; + } + + static XAttr generateNewXAttrForReencryptionFinish(final INodesInPath iip, + final ZoneReencryptionStatus status) throws IOException { + final ZoneEncryptionInfoProto zoneProto = getZoneEncryptionInfoProto(iip); + final ReencryptionInfoProto newRiProto = PBHelperClient + .convert(status.getEzKeyVersionName(), status.getSubmissionTime(), + status.isCanceled(), status.getFilesReencrypted(), + status.getNumReencryptionFailures(), Time.now(), null); + + final ZoneEncryptionInfoProto newZoneProto = PBHelperClient + .convert(PBHelperClient.convert(zoneProto.getSuite()), + PBHelperClient.convert(zoneProto.getCryptoProtocolVersion()), + zoneProto.getKeyName(), newRiProto); + + final XAttr xattr = XAttrHelper + .buildXAttr(CRYPTO_XATTR_ENCRYPTION_ZONE, newZoneProto.toByteArray()); + return xattr; + } + + private static ZoneEncryptionInfoProto getZoneEncryptionInfoProto( + final INodesInPath iip) throws IOException { + final XAttr fileXAttr = FSDirXAttrOp + .unprotectedGetXAttrByPrefixedName(iip, CRYPTO_XATTR_ENCRYPTION_ZONE); + if (fileXAttr == null) { + throw new IOException( + "Could not find reencryption XAttr for file " + iip.getPath()); + } + try { + return ZoneEncryptionInfoProto.parseFrom(fileXAttr.getValue()); + } catch (InvalidProtocolBufferException e) { + throw new IOException( + "Could not parse file encryption info for " + "inode " + iip + .getPath(), e); + } + } + + /** + * Save the batch's edeks to file xattrs. + */ + static void saveFileXAttrsForBatch(FSDirectory fsd, + List<FileEdekInfo> batch) { + assert fsd.getFSNamesystem().hasWriteLock(); + if (batch != null && !batch.isEmpty()) { + for (FileEdekInfo entry : batch) { + final INode inode = fsd.getInode(entry.getInodeId()); + Preconditions.checkNotNull(inode); + fsd.getEditLog().logSetXAttrs(inode.getFullPathName(), + inode.getXAttrFeature().getXAttrs(), false); + } + } + } + /** * Set the FileEncryptionInfo for an INode. * * @param fsd fsdirectory - * @param src the path of a directory which will be the root of the - * encryption zone. * @param info file encryption information + * @param flag action when setting xattr. Either CREATE or REPLACE. * @throws IOException */ static void setFileEncryptionInfo(final FSDirectory fsd, - final INodesInPath iip, final FileEncryptionInfo info) - throws IOException { + final INodesInPath iip, final FileEncryptionInfo info, + final XAttrSetFlag flag) throws IOException { // Make the PB for the xattr final HdfsProtos.PerFileEncryptionInfoProto proto = PBHelperClient.convertPerFileEncInfo(info); @@ -238,8 +435,7 @@ final class FSDirEncryptionZoneOp { xAttrs.add(fileEncryptionAttr); fsd.writeLock(); try { - FSDirXAttrOp.unprotectedSetXAttrs(fsd, iip, xAttrs, - EnumSet.of(XAttrSetFlag.CREATE)); + FSDirXAttrOp.unprotectedSetXAttrs(fsd, iip, xAttrs, EnumSet.of(flag)); } finally { fsd.writeUnlock(); } @@ -500,4 +696,34 @@ final class FSDirEncryptionZoneOp { this.edek = edek; } } + + /** + * Get the last key version name for the given EZ. This will contact + * the KMS to getKeyVersions. + * @param zone the encryption zone + * @param pc the permission checker + * @return the last element from the list of keyVersionNames returned by KMS. + * @throws IOException + */ + static KeyVersion getLatestKeyVersion(final FSDirectory dir, + final String zone, final FSPermissionChecker pc) throws IOException { + final EncryptionZone ez; + assert dir.getProvider() != null; + dir.readLock(); + try { + final INodesInPath iip = dir.resolvePath(pc, zone, DirOp.READ); + if (iip.getLastINode() == null) { + throw new FileNotFoundException(zone + " does not exist."); + } + dir.ezManager.checkEncryptionZoneRoot(iip.getLastINode(), iip.getPath()); + ez = FSDirEncryptionZoneOp.getEZForPath(dir, iip); + } finally { + dir.readUnlock(); + } + // Contact KMS out of locks. + KeyVersion currKv = dir.getProvider().getCurrentKey(ez.getKeyName()); + Preconditions.checkNotNull(currKv, + "No current key versions for key name " + ez.getKeyName()); + return currKv; + } }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index 7ab05d7..012e916 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode; import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FileAlreadyExistsException; @@ -397,7 +398,8 @@ class FSDirWriteFileOp { newNode.getFileUnderConstructionFeature().getClientName(), newNode.getId()); if (feInfo != null) { - FSDirEncryptionZoneOp.setFileEncryptionInfo(fsd, iip, feInfo); + FSDirEncryptionZoneOp.setFileEncryptionInfo(fsd, iip, feInfo, + XAttrSetFlag.CREATE); } setNewINodeStoragePolicy(fsd.getBlockManager(), iip, isLazyPersist); fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java index ddc088c..acdade7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.XAttrHelper; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReencryptionInfoProto; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp; import org.apache.hadoop.security.AccessControlException; @@ -275,6 +276,12 @@ class FSDirXAttrOp { PBHelperClient.convert(ezProto.getSuite()), PBHelperClient.convert(ezProto.getCryptoProtocolVersion()), ezProto.getKeyName()); + + if (ezProto.hasReencryptionProto()) { + ReencryptionInfoProto reProto = ezProto.getReencryptionProto(); + fsd.ezManager.getReencryptionStatus() + .updateZoneStatus(inode.getId(), iip.getPath(), reProto); + } } if (!isFile && SECURITY_XATTR_UNREADABLE_BY_SUPERUSER.equals(xaName)) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 87b1156..e6aa533 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReencryptionInfoProto; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; @@ -1358,12 +1359,18 @@ public class FSDirectory implements Closeable { } try { final HdfsProtos.ZoneEncryptionInfoProto ezProto = - HdfsProtos.ZoneEncryptionInfoProto.parseFrom( - xattr.getValue()); + HdfsProtos.ZoneEncryptionInfoProto.parseFrom(xattr.getValue()); ezManager.unprotectedAddEncryptionZone(inode.getId(), PBHelperClient.convert(ezProto.getSuite()), PBHelperClient.convert(ezProto.getCryptoProtocolVersion()), ezProto.getKeyName()); + if (ezProto.hasReencryptionProto()) { + final ReencryptionInfoProto reProto = ezProto.getReencryptionProto(); + // inodes parents may not be loaded if this is done during fsimage + // loading so cannot set full path now. Pass in null to indicate that. + ezManager.getReencryptionStatus() + .updateZoneStatus(inode.getId(), null, reProto); + } } catch (InvalidProtocolBufferException e) { NameNode.LOG.warn("Error parsing protocol buffer of " + "EZ XAttr " + xattr.getName() + " dir:" + inode.getFullPathName()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 2313335..12d96d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -89,9 +89,11 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*; +import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; import org.apache.hadoop.hdfs.protocol.BlocksStats; import org.apache.hadoop.hdfs.protocol.ECBlockGroupsStats; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus; import org.apache.hadoop.hdfs.server.namenode.metrics.ReplicatedBlocksMBean; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import static org.apache.hadoop.util.Time.now; @@ -199,6 +201,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.IllegalECPolicyException; @@ -1230,6 +1233,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, dir.updateCountForQuota(); // Enable quota checks. dir.enableQuotaChecks(); + dir.ezManager.startReencryptThreads(); + if (haEnabled) { // Renew all of the leases before becoming active. // This is because, while we were in standby mode, @@ -1321,6 +1326,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, // so that the tailer starts from the right spot. getFSImage().updateLastAppliedTxIdFromWritten(); } + if (dir != null) { + dir.ezManager.stopReencryptThread(); + } if (cacheManager != null) { cacheManager.stopMonitorThread(); cacheManager.clearDirectiveStats(); @@ -7031,6 +7039,84 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } } + void reencryptEncryptionZone(final String zone, final ReencryptAction action, + final boolean logRetryCache) throws IOException { + boolean success = false; + try { + Preconditions.checkNotNull(zone, "zone is null."); + checkSuperuserPrivilege(); + checkOperation(OperationCategory.WRITE); + checkNameNodeSafeMode("NameNode in safemode, cannot " + action + + " re-encryption on zone " + zone); + reencryptEncryptionZoneInt(zone, action, logRetryCache); + success = true; + } finally { + logAuditEvent(success, action + "reencryption", zone, null, null); + } + } + + BatchedListEntries<ZoneReencryptionStatus> listReencryptionStatus( + final long prevId) throws IOException { + final String operationName = "listReencryptionStatus"; + boolean success = false; + checkSuperuserPrivilege(); + checkOperation(OperationCategory.READ); + readLock(); + try { + checkSuperuserPrivilege(); + checkOperation(OperationCategory.READ); + final BatchedListEntries<ZoneReencryptionStatus> ret = + FSDirEncryptionZoneOp.listReencryptionStatus(dir, prevId); + success = true; + return ret; + } finally { + readUnlock(operationName); + logAuditEvent(success, operationName, null); + } + } + + private void reencryptEncryptionZoneInt(final String zone, + final ReencryptAction action, final boolean logRetryCache) + throws IOException { + if (getProvider() == null) { + throw new IOException("No key provider configured, re-encryption " + + "operation is rejected"); + } + FSPermissionChecker pc = getPermissionChecker(); + // get keyVersionName out of the lock. This keyVersionName will be used + // as the target keyVersion for the entire re-encryption. + // This means all edek's keyVersion will be compared with this one, and + // kms is only contacted if the edek's keyVersion is different. + final KeyVersion kv = + FSDirEncryptionZoneOp.getLatestKeyVersion(dir, zone, pc); + provider.invalidateCache(kv.getName()); + writeLock(); + try { + checkSuperuserPrivilege(); + checkOperation(OperationCategory.WRITE); + checkNameNodeSafeMode( + "NameNode in safemode, cannot " + action + " re-encryption on zone " + + zone); + switch (action) { + case START: + FSDirEncryptionZoneOp + .reencryptEncryptionZone(dir, zone, kv.getVersionName(), + logRetryCache); + break; + case CANCEL: + FSDirEncryptionZoneOp + .cancelReencryptEncryptionZone(dir, zone, logRetryCache); + break; + default: + throw new IOException( + "Re-encryption action " + action + " is not supported"); + } + } finally { + writeUnlock(); + } + getEditLog().logSync(); + } + /** * Set an erasure coding policy on the given path. * @param srcArg The path of the target directory. http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 7871202..3fbb7bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -105,6 +105,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.FSLimitException; import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -116,6 +117,7 @@ import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.BlocksStats; +import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; @@ -2052,6 +2054,31 @@ public class NameNodeRpcServer implements NamenodeProtocols { } @Override // ClientProtocol + public void reencryptEncryptionZone(final String zone, + final ReencryptAction action) throws IOException { + checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); + final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return; + } + boolean success = false; + try { + namesystem.reencryptEncryptionZone(zone, action, cacheEntry != null); + success = true; + } finally { + RetryCache.setState(cacheEntry, success); + } + } + + @Override // ClientProtocol + public BatchedEntries<ZoneReencryptionStatus> listReencryptionStatus( + final long prevId) throws IOException { + checkNNStartup(); + return namesystem.listReencryptionStatus(prevId); + } + + @Override // ClientProtocol public void setErasureCodingPolicy(String src, String ecPolicyName) throws IOException { checkNNStartup(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java new file mode 100644 index 0000000..729b894 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java @@ -0,0 +1,940 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; +import org.apache.hadoop.fs.FileEncryptionInfo; +import org.apache.hadoop.fs.XAttr; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.ReencryptionStatus; +import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus; +import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus.State; +import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; +import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.FileEdekInfo; +import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ReencryptionTask; +import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ZoneSubmissionTracker; +import org.apache.hadoop.hdfs.util.ReadOnlyList; +import org.apache.hadoop.ipc.RetriableException; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.StopWatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_BATCH_SIZE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_BATCH_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_EDEK_THREADS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_EDEK_THREADS_KEY; + +/** + * Class for handling re-encrypt EDEK operations. + * <p> + * For each EZ, ReencryptionHandler walks the tree in a depth-first order, + * and submits batches of (files + existing edeks) as re-encryption tasks + * to a thread pool. Each thread in the pool then contacts the KMS to + * re-encrypt the edeks. ReencryptionUpdater tracks the tasks and updates + * file xattrs with the new edeks. + * <p> + * File renames are disabled in the EZ that's being re-encrypted. Newly created + * files will have new edeks, because the edek cache is drained upon the + * submission of a re-encryption command. + * <p> + * It is assumed only 1 ReencryptionHandler will be running, because: + * 1. The bottleneck of the entire re-encryption appears to be on the KMS. + * 2. Even with multiple handlers, since updater requires writelock and is + * single-threaded, the performance gain is limited. + * <p> + * This class uses the FSDirectory lock for synchronization. + */ +@InterfaceAudience.Private +public class ReencryptionHandler implements Runnable { + + public static final Logger LOG = + LoggerFactory.getLogger(ReencryptionHandler.class); + + // 2000 is based on buffer size = 512 * 1024, and SetXAttr op size is + // 100 - 200 bytes (depending on the xattr value). + // The buffer size is hard-coded, see outputBufferCapacity from QJM. + private static final int MAX_BATCH_SIZE_WITHOUT_FLOODING = 2000; + + private final EncryptionZoneManager ezManager; + private final FSDirectory dir; + private final long interval; + private final int reencryptBatchSize; + private double throttleLimitHandlerRatio; + private final int reencryptThreadPoolSize; + // stopwatches for throttling + private final StopWatch throttleTimerAll = new StopWatch(); + private final StopWatch throttleTimerLocked = new StopWatch(); + + private ExecutorCompletionService<ReencryptionTask> batchService; + private BlockingQueue<Runnable> taskQueue; + // protected by ReencryptionHandler object lock + private final Map<Long, ZoneSubmissionTracker> submissions = + new ConcurrentHashMap<>(); + + // The current batch that the handler is working on. Handler is designed to + // be single-threaded, see class javadoc for more details. + private ReencryptionBatch currentBatch; + + private final ReencryptionUpdater reencryptionUpdater; + private ExecutorService updaterExecutor; + + // Vars for unit tests. + private volatile boolean shouldPauseForTesting = false; + private volatile int pauseAfterNthSubmission = 0; + + /** + * Stop the re-encryption updater thread, as well as all EDEK re-encryption + * tasks submitted. + */ + void stopThreads() { + assert dir.hasWriteLock(); + for (ZoneSubmissionTracker zst : submissions.values()) { + zst.cancelAllTasks(); + } + if (updaterExecutor != null) { + updaterExecutor.shutdownNow(); + } + } + + /** + * Start the re-encryption updater thread. + */ + void startUpdaterThread() { + updaterExecutor = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("reencryptionUpdaterThread #%d").build()); + updaterExecutor.execute(reencryptionUpdater); + } + + @VisibleForTesting + synchronized void pauseForTesting() { + shouldPauseForTesting = true; + LOG.info("Pausing re-encrypt handler for testing."); + notify(); + } + + @VisibleForTesting + synchronized void resumeForTesting() { + shouldPauseForTesting = false; + LOG.info("Resuming re-encrypt handler for testing."); + notify(); + } + + @VisibleForTesting + void pauseForTestingAfterNthSubmission(final int count) { + assert pauseAfterNthSubmission == 0; + pauseAfterNthSubmission = count; + } + + @VisibleForTesting + void pauseUpdaterForTesting() { + reencryptionUpdater.pauseForTesting(); + } + + @VisibleForTesting + void resumeUpdaterForTesting() { + reencryptionUpdater.resumeForTesting(); + } + + @VisibleForTesting + void pauseForTestingAfterNthCheckpoint(final long zoneId, final int count) { + reencryptionUpdater.pauseForTestingAfterNthCheckpoint(zoneId, count); + } + + private synchronized void checkPauseForTesting() throws InterruptedException { + assert !dir.hasReadLock(); + assert !dir.getFSNamesystem().hasReadLock(); + while (shouldPauseForTesting) { + LOG.info("Sleeping in the re-encrypt handler for unit test."); + wait(); + LOG.info("Continuing re-encrypt handler after pausing."); + } + } + + ReencryptionHandler(final EncryptionZoneManager ezMgr, + final Configuration conf) { + this.ezManager = ezMgr; + Preconditions.checkNotNull(ezManager.getProvider(), + "No provider set, cannot re-encrypt"); + this.dir = ezMgr.getFSDirectory(); + this.interval = + conf.getTimeDuration(DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY, + DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + Preconditions.checkArgument(interval > 0, + DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY + " is not positive."); + this.reencryptBatchSize = conf.getInt(DFS_NAMENODE_REENCRYPT_BATCH_SIZE_KEY, + DFS_NAMENODE_REENCRYPT_BATCH_SIZE_DEFAULT); + Preconditions.checkArgument(reencryptBatchSize > 0, + DFS_NAMENODE_REENCRYPT_BATCH_SIZE_KEY + " is not positive."); + if (reencryptBatchSize > MAX_BATCH_SIZE_WITHOUT_FLOODING) { + LOG.warn("Re-encryption batch size is {}. It could cause edit log buffer " + + "to be full and trigger a logSync within the writelock, greatly " + + "impacting namenode throughput.", reencryptBatchSize); + } + this.throttleLimitHandlerRatio = + conf.getDouble(DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY, + DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_DEFAULT); + LOG.info("Configured throttleLimitHandlerRatio={} for re-encryption", + throttleLimitHandlerRatio); + Preconditions.checkArgument(throttleLimitHandlerRatio > 0.0f, + DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY + + " is not positive."); + this.reencryptThreadPoolSize = + conf.getInt(DFS_NAMENODE_REENCRYPT_EDEK_THREADS_KEY, + DFS_NAMENODE_REENCRYPT_EDEK_THREADS_DEFAULT); + + taskQueue = new LinkedBlockingQueue<>(); + ThreadPoolExecutor threadPool = + new ThreadPoolExecutor(reencryptThreadPoolSize, reencryptThreadPoolSize, + 60, TimeUnit.SECONDS, taskQueue, new Daemon.DaemonFactory() { + private final AtomicInteger ind = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + Thread t = super.newThread(r); + t.setName("reencryption edek Thread-" + ind.getAndIncrement()); + return t; + } + }, new ThreadPoolExecutor.CallerRunsPolicy() { + + @Override + public void rejectedExecution(Runnable runnable, + ThreadPoolExecutor e) { + LOG.info("Execution rejected, executing in current thread"); + super.rejectedExecution(runnable, e); + } + }); + + threadPool.allowCoreThreadTimeOut(true); + this.batchService = new ExecutorCompletionService(threadPool); + reencryptionUpdater = + new ReencryptionUpdater(dir, batchService, this, conf); + currentBatch = new ReencryptionBatch(reencryptBatchSize); + } + + ReencryptionStatus getReencryptionStatus() { + return ezManager.getReencryptionStatus(); + } + + void cancelZone(final long zoneId, final String zoneName) throws IOException { + assert dir.hasWriteLock(); + final ZoneReencryptionStatus zs = + getReencryptionStatus().getZoneStatus(zoneId); + if (zs == null || zs.getState() == State.Completed) { + throw new IOException("Zone " + zoneName + " is not under re-encryption"); + } + zs.cancel(); + ZoneSubmissionTracker zst = submissions.get(zoneId); + if (zst != null) { + zst.cancelAllTasks(); + } + } + + void removeZone(final long zoneId) { + assert dir.hasWriteLock(); + LOG.info("Removing zone {} from re-encryption.", zoneId); + ZoneSubmissionTracker zst = submissions.get(zoneId); + if (zst != null) { + zst.cancelAllTasks(); + } + submissions.remove(zoneId); + getReencryptionStatus().removeZone(zoneId); + } + + ZoneSubmissionTracker getTracker(final long zoneId) { + dir.hasReadLock(); + return unprotectedGetTracker(zoneId); + } + + /** + * get the tracker without holding the FSDirectory lock. This is only used for + * testing, when updater checks about pausing. + */ + ZoneSubmissionTracker unprotectedGetTracker(final long zoneId) { + return submissions.get(zoneId); + } + + /** + * Add a dummy tracker (with 1 task that has 0 files to re-encrypt) + * for the zone. This is necessary to complete the re-encryption in case + * no file in the entire zone needs re-encryption at all. We cannot simply + * update zone status and set zone xattrs, because in the handler we only hold + * readlock, and setting xattrs requires upgrading to a writelock. + * + * @param zoneId + */ + void addDummyTracker(final long zoneId) { + assert dir.hasReadLock(); + assert !submissions.containsKey(zoneId); + final ZoneSubmissionTracker zst = new ZoneSubmissionTracker(); + zst.setSubmissionDone(); + + Future future = batchService.submit( + new EDEKReencryptCallable(zoneId, new ReencryptionBatch(), this)); + zst.addTask(future); + submissions.put(zoneId, zst); + } + + /** + * Main loop. It takes at most 1 zone per scan, and executes until the zone + * is completed. + * {@see #reencryptEncryptionZoneInt(Long)}. + */ + @Override + public void run() { + LOG.info("Starting up re-encrypt thread with interval={} millisecond.", + interval); + while (true) { + try { + synchronized (this) { + wait(interval); + } + checkPauseForTesting(); + } catch (InterruptedException ie) { + LOG.info("Re-encrypt handler interrupted. Exiting"); + Thread.currentThread().interrupt(); + return; + } + + final Long zoneId; + dir.readLock(); + try { + zoneId = getReencryptionStatus().getNextUnprocessedZone(); + if (zoneId == null) { + // empty queue. + continue; + } + LOG.info("Executing re-encrypt commands on zone {}. Current zones:{}", + zoneId, getReencryptionStatus()); + } finally { + dir.readUnlock(); + } + + try { + reencryptEncryptionZone(zoneId); + } catch (RetriableException | SafeModeException re) { + LOG.info("Re-encryption caught exception, will retry", re); + getReencryptionStatus().markZoneForRetry(zoneId); + } catch (IOException ioe) { + LOG.warn("IOException caught when re-encrypting zone {}", zoneId, ioe); + } catch (InterruptedException ie) { + LOG.info("Re-encrypt handler interrupted. Exiting."); + Thread.currentThread().interrupt(); + return; + } catch (Throwable t) { + LOG.error("Re-encrypt handler thread exiting. Exception caught when" + + " re-encrypting zone {}.", zoneId, t); + return; + } + } + } + + /** + * Re-encrypts a zone by recursively iterating all paths inside the zone, + * in lexicographic order. + * Files are re-encrypted, and subdirs are processed during iteration. + * + * @param zoneId the Zone's id. + * @throws IOException + * @throws InterruptedException + */ + void reencryptEncryptionZone(final long zoneId) + throws IOException, InterruptedException { + throttleTimerAll.reset().start(); + throttleTimerLocked.reset(); + final INode zoneNode; + final ZoneReencryptionStatus zs; + + readLock(); + try { + getReencryptionStatus().markZoneStarted(zoneId); + zoneNode = dir.getInode(zoneId); + // start re-encrypting the zone from the beginning + if (zoneNode == null) { + LOG.info("Directory with id {} removed during re-encrypt, skipping", + zoneId); + return; + } + if (!zoneNode.isDirectory()) { + LOG.info("Cannot re-encrypt directory with id {} because it's not a" + + " directory.", zoneId); + return; + } + + zs = getReencryptionStatus().getZoneStatus(zoneId); + assert zs != null; + // Only costly log FullPathName here once, and use id elsewhere. + LOG.info("Re-encrypting zone {}(id={})", zoneNode.getFullPathName(), + zoneId); + if (zs.getLastCheckpointFile() == null) { + // new re-encryption + reencryptDir(zoneNode.asDirectory(), zoneId, HdfsFileStatus.EMPTY_NAME, + zs.getEzKeyVersionName()); + } else { + // resuming from a past re-encryption + restoreFromLastProcessedFile(zoneId, zs); + } + // save the last batch and mark complete + submitCurrentBatch(zoneId); + LOG.info("Submission completed of zone {} for re-encryption.", zoneId); + reencryptionUpdater.markZoneSubmissionDone(zoneId); + } finally { + readUnlock(); + } + } + + List<XAttr> completeReencryption(final INode zoneNode) throws IOException { + assert dir.hasWriteLock(); + assert dir.getFSNamesystem().hasWriteLock(); + final Long zoneId = zoneNode.getId(); + ZoneReencryptionStatus zs = getReencryptionStatus().getZoneStatus(zoneId); + assert zs != null; + LOG.info("Re-encryption completed on zone {}. Re-encrypted {} files," + + " failures encountered: {}.", zoneNode.getFullPathName(), + zs.getFilesReencrypted(), zs.getNumReencryptionFailures()); + // This also removes the zone from reencryptionStatus + submissions.remove(zoneId); + return FSDirEncryptionZoneOp + .updateReencryptionFinish(dir, INodesInPath.fromINode(zoneNode), zs); + } + + /** + * Restore the re-encryption from the progress inside ReencryptionStatus. + * This means start from exactly the lastProcessedFile (LPF), skipping all + * earlier paths in lexicographic order. Lexicographically-later directories + * on the LPF parent paths are added to subdirs. + */ + private void restoreFromLastProcessedFile(final long zoneId, + final ZoneReencryptionStatus zs) + throws IOException, InterruptedException { + final INodeDirectory parent; + final byte[] startAfter; + final INodesInPath lpfIIP = + dir.getINodesInPath(zs.getLastCheckpointFile(), FSDirectory.DirOp.READ); + parent = lpfIIP.getLastINode().getParent(); + startAfter = lpfIIP.getLastINode().getLocalNameBytes(); + reencryptDir(parent, zoneId, startAfter, zs.getEzKeyVersionName()); + } + + /** + * Iterate through all files directly inside parent, and recurse down + * directories. The listing is done in batch, and can optionally start after + * a position. + * <p> + * Each batch is then send to the threadpool, where KMS will be contacted and + * edek re-encrypted. {@link ReencryptionUpdater} handles the tasks completed + * from the threadpool. + * <p> + * The iteration of the inode tree is done in a depth-first fashion. But + * instead of holding all INodeDirectory's in memory on the fly, only the + * path components to the current inode is held. This is to reduce memory + * consumption. + * + * @param parent The inode id of parent directory + * @param zoneId Id of the EZ inode + * @param startAfter Full path of a file the re-encrypt should start after. + * @throws IOException + * @throws InterruptedException + */ + private void reencryptDir(final INodeDirectory parent, final long zoneId, + byte[] startAfter, final String ezKeyVerName) + throws IOException, InterruptedException { + List<byte[]> startAfters = new ArrayList<>(); + if (parent == null) { + return; + } + INode curr = parent; + // construct startAfters all the way up to the zone inode. + startAfters.add(startAfter); + while (curr.getId() != zoneId) { + startAfters.add(0, curr.getLocalNameBytes()); + curr = curr.getParent(); + } + curr = reencryptDirInt(zoneId, parent, startAfters, ezKeyVerName); + while (!startAfters.isEmpty()) { + if (curr == null) { + // lock was reacquired, re-resolve path. + curr = resolvePaths(zoneId, startAfters); + } + curr = reencryptDirInt(zoneId, curr, startAfters, ezKeyVerName); + } + } + + /** + * Resolve the cursor of re-encryption to an inode. + * <p> + * The parent of the lowest level startAfter is returned. If somewhere in the + * middle of startAfters changed, the parent of the lowest unchanged level is + * returned. + * + * @param zoneId Id of the EZ inode. + * @param startAfters the cursor, represented by a list of path bytes. + * @return the parent inode corresponding to the startAfters, or null if + * the EZ node (furthest parent) is deleted. + */ + private INode resolvePaths(final long zoneId, List<byte[]> startAfters) + throws IOException { + // If the readlock was reacquired, we need to resolve the paths again + // in case things have changed. If our cursor file/dir is changed, + // continue from the next one. + INode zoneNode = dir.getInode(zoneId); + if (zoneNode == null) { + throw new FileNotFoundException("Zone " + zoneId + " is deleted."); + } + INodeDirectory parent = zoneNode.asDirectory(); + for (int i = 0; i < startAfters.size(); ++i) { + if (i == startAfters.size() - 1) { + // last startAfter does not need to be resolved, since search for + // nextChild will cover that automatically. + break; + } + INode curr = + parent.getChild(startAfters.get(i), Snapshot.CURRENT_STATE_ID); + if (curr == null) { + // inode at this level has changed. Update startAfters to point to + // the next dir at the parent level (and dropping any startAfters + // at lower levels). + for (; i < startAfters.size(); ++i) { + startAfters.remove(startAfters.size() - 1); + } + break; + } + parent = curr.asDirectory(); + } + return parent; + } + + /** + * Submit the current batch to the thread pool. + * + * @param zoneId Id of the EZ INode + * @throws IOException + * @throws InterruptedException + */ + private void submitCurrentBatch(final long zoneId) + throws IOException, InterruptedException { + assert dir.hasReadLock(); + if (currentBatch.isEmpty()) { + return; + } + ZoneSubmissionTracker zst = submissions.get(zoneId); + if (zst == null) { + zst = new ZoneSubmissionTracker(); + submissions.put(zoneId, zst); + } + Future future = batchService + .submit(new EDEKReencryptCallable(zoneId, currentBatch, this)); + zst.addTask(future); + LOG.info("Submitted batch (start:{}, size:{}) of zone {} to re-encrypt.", + currentBatch.getFirstFilePath(), currentBatch.size(), zoneId); + currentBatch = new ReencryptionBatch(reencryptBatchSize); + // flip the pause flag if this is nth submission. + // The actual pause need to happen outside of the lock. + if (pauseAfterNthSubmission > 0) { + if (--pauseAfterNthSubmission == 0) { + shouldPauseForTesting = true; + } + } + } + + final class ReencryptionBatch { + // First file's path, for logging purpose. + private String firstFilePath; + private final List<FileEdekInfo> batch; + + ReencryptionBatch() { + this(reencryptBatchSize); + } + + ReencryptionBatch(int initialCapacity) { + batch = new ArrayList<>(initialCapacity); + } + + void add(final INodeFile inode) throws IOException { + assert dir.hasReadLock(); + Preconditions.checkNotNull(inode, "INodeFile is null"); + if (batch.isEmpty()) { + firstFilePath = inode.getFullPathName(); + } + batch.add(new FileEdekInfo(dir, inode)); + } + + String getFirstFilePath() { + return firstFilePath; + } + + boolean isEmpty() { + return batch.isEmpty(); + } + + int size() { + return batch.size(); + } + + void clear() { + batch.clear(); + } + + List<FileEdekInfo> getBatch() { + return batch; + } + } + + /** + * Simply contacts the KMS for re-encryption. No NN locks held. + */ + private static class EDEKReencryptCallable + implements Callable<ReencryptionTask> { + private final long zoneNodeId; + private final ReencryptionBatch batch; + private final ReencryptionHandler handler; + + EDEKReencryptCallable(final long zoneId, + final ReencryptionBatch currentBatch, final ReencryptionHandler rh) { + zoneNodeId = zoneId; + batch = currentBatch; + handler = rh; + } + + @Override + public ReencryptionTask call() { + LOG.info("Processing batched re-encryption for zone {}, batch size {}," + + " start:{}", zoneNodeId, batch.size(), batch.getFirstFilePath()); + if (batch.isEmpty()) { + return new ReencryptionTask(zoneNodeId, 0, batch); + } + final Stopwatch kmsSW = new Stopwatch().start(); + + int numFailures = 0; + String result = "Completed"; + if (!reencryptEdeks()) { + numFailures += batch.size(); + result = "Failed to"; + } + LOG.info("{} re-encrypting one batch of {} edeks from KMS," + + " time consumed: {}, start: {}.", result, + batch.size(), kmsSW.stop(), batch.getFirstFilePath()); + return new ReencryptionTask(zoneNodeId, numFailures, batch); + } + + private boolean reencryptEdeks() { + // communicate with the kms out of lock + final List<EncryptedKeyVersion> edeks = new ArrayList<>(batch.size()); + for (FileEdekInfo entry : batch.getBatch()) { + edeks.add(entry.getExistingEdek()); + } + // provider already has LoadBalancingKMSClientProvider's reties. It that + // fails, just fail this callable. + try { + handler.ezManager.getProvider().reencryptEncryptedKeys(edeks); + EncryptionFaultInjector.getInstance().reencryptEncryptedKeys(); + } catch (GeneralSecurityException | IOException ex) { + LOG.warn("Failed to re-encrypt one batch of {} edeks, start:{}", + batch.size(), batch.getFirstFilePath(), ex); + return false; + } + int i = 0; + for (FileEdekInfo entry : batch.getBatch()) { + assert i < edeks.size(); + entry.setEdek(edeks.get(i++)); + } + return true; + } + } + + /** + * Iterates the parent directory, and add direct children files to + * current batch. If batch size meets configured threshold, a Callable + * is created and sent to the thread pool, which will communicate to the KMS + * to get new edeks. + * <p> + * Locks could be released and reacquired when a Callable is created. + * + * @param zoneId Id of the EZ INode + * @return The inode which was just processed, if lock is held in the entire + * process. Null if lock is released. + * @throws IOException + * @throws InterruptedException + */ + private INode reencryptDirInt(final long zoneId, INode curr, + List<byte[]> startAfters, final String ezKeyVerName) + throws IOException, InterruptedException { + assert dir.hasReadLock(); + assert dir.getFSNamesystem().hasReadLock(); + Preconditions.checkNotNull(curr, "Current inode can't be null"); + checkZoneReady(zoneId); + final INodeDirectory parent = + curr.isDirectory() ? curr.asDirectory() : curr.getParent(); + ReadOnlyList<INode> children = + parent.getChildrenList(Snapshot.CURRENT_STATE_ID); + if (LOG.isDebugEnabled()) { + LOG.debug("Re-encrypting directory {}", parent.getFullPathName()); + } + + final byte[] startAfter = startAfters.get(startAfters.size() - 1); + boolean lockReleased = false; + for (int i = INodeDirectory.nextChild(children, startAfter); + i < children.size(); ++i) { + final INode inode = children.get(i); + if (!reencryptINode(inode, ezKeyVerName)) { + // inode wasn't added for re-encryption. Recurse down if it's a dir, + // skip otherwise. + if (!inode.isDirectory()) { + continue; + } + if (ezManager.isEncryptionZoneRoot(inode, inode.getFullPathName())) { + // nested EZ, ignore. + LOG.info("{}({}) is a nested EZ, skipping for re-encryption", + inode.getFullPathName(), inode.getId()); + continue; + } + // add 1 level to the depth-first search. + curr = inode; + if (!startAfters.isEmpty()) { + startAfters.remove(startAfters.size() - 1); + startAfters.add(curr.getLocalNameBytes()); + } + startAfters.add(HdfsFileStatus.EMPTY_NAME); + return lockReleased ? null : curr; + } + if (currentBatch.size() >= reencryptBatchSize) { + final byte[] currentStartAfter = inode.getLocalNameBytes(); + final String parentPath = parent.getFullPathName(); + submitCurrentBatch(zoneId); + lockReleased = true; + readUnlock(); + try { + throttle(); + checkPauseForTesting(); + } finally { + readLock(); + } + checkZoneReady(zoneId); + + // Things could have changed when the lock was released. + // Re-resolve the parent inode. + FSPermissionChecker pc = dir.getPermissionChecker(); + INode newParent = + dir.resolvePath(pc, parentPath, FSDirectory.DirOp.READ) + .getLastINode(); + if (newParent == null || !newParent.equals(parent)) { + // parent dir is deleted or recreated. We're done. + return null; + } + children = parent.getChildrenList(Snapshot.CURRENT_STATE_ID); + // -1 to counter the ++ on the for loop + i = INodeDirectory.nextChild(children, currentStartAfter) - 1; + } + } + // Successfully finished this dir, adjust pointers to 1 level up, and + // startAfter this dir. + startAfters.remove(startAfters.size() - 1); + if (!startAfters.isEmpty()) { + startAfters.remove(startAfters.size() - 1); + startAfters.add(curr.getLocalNameBytes()); + } + curr = curr.getParent(); + return lockReleased ? null : curr; + } + + private void readLock() { + dir.getFSNamesystem().readLock(); + dir.readLock(); + throttleTimerLocked.start(); + } + + private void readUnlock() { + dir.readUnlock(); + dir.getFSNamesystem().readUnlock("reencryptHandler"); + throttleTimerLocked.stop(); + } + + /** + * Throttles the ReencryptionHandler in 3 aspects: + * 1. Prevents generating more Callables than the CPU could possibly handle. + * 2. Prevents generating more Callables than the ReencryptionUpdater can + * handle, under its own throttling + * 3. Prevents contending FSN/FSD read locks. This is done based on the + * DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_RATIO_KEY configuration. + * <p> + * Item 1 and 2 are to control NN heap usage. + * + * @throws InterruptedException + */ + @VisibleForTesting + void throttle() throws InterruptedException { + // 1. + final int numCores = Runtime.getRuntime().availableProcessors(); + if (taskQueue.size() >= numCores) { + LOG.debug("Re-encryption handler throttling because queue size {} is" + + "larger than number of cores {}", taskQueue.size(), numCores); + while (taskQueue.size() >= numCores) { + Thread.sleep(100); + } + } + + // 2. if tasks are piling up on the updater, don't create new callables + // until the queue size goes down. + final int maxTasksPiled = Runtime.getRuntime().availableProcessors() * 2; + int totalTasks = 0; + for (ZoneSubmissionTracker zst : submissions.values()) { + totalTasks += zst.getTasks().size(); + } + if (totalTasks >= maxTasksPiled) { + LOG.debug("Re-encryption handler throttling because total tasks pending" + + " re-encryption updater is {}", totalTasks); + while (totalTasks >= maxTasksPiled) { + Thread.sleep(500); + totalTasks = 0; + for (ZoneSubmissionTracker zst : submissions.values()) { + totalTasks += zst.getTasks().size(); + } + } + } + + // 3. + if (throttleLimitHandlerRatio >= 1.0) { + return; + } + final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS) + * throttleLimitHandlerRatio); + final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS); + if (LOG.isDebugEnabled()) { + LOG.debug("Re-encryption handler throttling expect: {}, actual: {}," + + " throttleTimerAll:{}", expect, actual, + throttleTimerAll.now(TimeUnit.MILLISECONDS)); + } + if (expect - actual < 0) { + // in case throttleLimitHandlerRatio is very small, expect will be 0. + // so sleepMs should not be calculated from expect, to really meet the + // ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs + // should be 1000 - throttleTimerAll.now() + final long sleepMs = + (long) (actual / throttleLimitHandlerRatio) - throttleTimerAll + .now(TimeUnit.MILLISECONDS); + LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs); + Thread.sleep(sleepMs); + } + throttleTimerAll.reset().start(); + throttleTimerLocked.reset(); + } + + /** + * Process an Inode for re-encryption. Add to current batch if it's a file, + * no-op otherwise. + * + * @param inode the inode + * @return true if inode is added to currentBatch and should be re-encrypted. + * false otherwise: could be inode is not a file, or inode's edek's + * key version is not changed. + * @throws IOException + * @throws InterruptedException + */ + private boolean reencryptINode(final INode inode, final String ezKeyVerName) + throws IOException, InterruptedException { + dir.hasReadLock(); + if (LOG.isTraceEnabled()) { + LOG.trace("Processing {} for re-encryption", inode.getFullPathName()); + } + if (!inode.isFile()) { + return false; + } + FileEncryptionInfo feInfo = FSDirEncryptionZoneOp + .getFileEncryptionInfo(dir, INodesInPath.fromINode(inode)); + if (feInfo == null) { + LOG.warn("File {} skipped re-encryption because it is not encrypted! " + + "This is very likely a bug.", inode.getId()); + return false; + } + if (ezKeyVerName.equals(feInfo.getEzKeyVersionName())) { + if (LOG.isDebugEnabled()) { + LOG.debug("File {} skipped re-encryption because edek's key version" + + " name is not changed.", inode.getFullPathName()); + } + return false; + } + currentBatch.add(inode.asFile()); + return true; + } + + /** + * Check whether zone is ready for re-encryption. Throws IOE if it's not. + * 1. If EZ is deleted. + * 2. if the re-encryption is canceled. + * 3. If NN is not active or is in safe mode. + * + * @throws IOException if zone does not exist / is cancelled, or if NN is not + * ready for write. + */ + void checkZoneReady(final long zoneId) + throws RetriableException, SafeModeException, IOException { + final ZoneReencryptionStatus zs = + getReencryptionStatus().getZoneStatus(zoneId); + if (zs == null) { + throw new IOException("Zone " + zoneId + " status cannot be found."); + } + if (zs.isCanceled()) { + throw new IOException("Re-encryption is canceled for zone " + zoneId); + } + dir.getFSNamesystem() + .checkNameNodeSafeMode("NN is in safe mode, cannot re-encrypt."); + // re-encryption should be cancelled when NN goes to standby. Just + // double checking for sanity. + dir.getFSNamesystem().checkOperation(NameNode.OperationCategory.WRITE); + } + + /** + * Called when a new zone is submitted for re-encryption. This will interrupt + * the background thread if it's waiting for the next + * DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY. + */ + synchronized void notifyNewSubmission() { + LOG.debug("Notifying handler for new re-encryption command."); + this.notify(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java new file mode 100644 index 0000000..690a0e9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java @@ -0,0 +1,523 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; +import org.apache.hadoop.fs.FileEncryptionInfo; +import org.apache.hadoop.fs.XAttr; +import org.apache.hadoop.fs.XAttrSetFlag; +import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus; +import org.apache.hadoop.hdfs.server.namenode.ReencryptionHandler.ReencryptionBatch; +import org.apache.hadoop.ipc.RetriableException; +import org.apache.hadoop.util.StopWatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.ListIterator; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_KEY; + +/** + * Class for finalizing re-encrypt EDEK operations, by updating file xattrs with + * edeks returned from reencryption. + * <p> + * The tasks are submitted by ReencryptionHandler. + * <p> + * It is assumed only 1 Updater will be running, since updating file xattrs + * requires namespace write lock, and performance gain from multi-threading + * is limited. + */ +@InterfaceAudience.Private +public final class ReencryptionUpdater implements Runnable { + + public static final Logger LOG = + LoggerFactory.getLogger(ReencryptionUpdater.class); + + private volatile boolean shouldPauseForTesting = false; + private volatile int pauseAfterNthCheckpoint = 0; + private volatile long pauseZoneId = 0; + + private double throttleLimitRatio; + private final StopWatch throttleTimerAll = new StopWatch(); + private final StopWatch throttleTimerLocked = new StopWatch(); + + private volatile long faultRetryInterval = 60000; + + /** + * Class to track re-encryption submissions of a single zone. It contains + * all the submitted futures, and statistics about how far the futures are + * processed. + */ + static final class ZoneSubmissionTracker { + private boolean submissionDone; + private LinkedList<Future> tasks; + private int numCheckpointed; + private int numFutureDone; + + ZoneSubmissionTracker() { + submissionDone = false; + tasks = new LinkedList<>(); + numCheckpointed = 0; + numFutureDone = 0; + } + + LinkedList<Future> getTasks() { + return tasks; + } + + void cancelAllTasks() { + if (!tasks.isEmpty()) { + LOG.info("Cancelling {} re-encryption tasks", tasks.size()); + for (Future f : tasks) { + f.cancel(true); + } + } + } + + void addTask(final Future task) { + tasks.add(task); + } + + private boolean isCompleted() { + return submissionDone && tasks.isEmpty(); + } + + void setSubmissionDone() { + submissionDone = true; + } + } + + /** + * Class representing the task for one batch of a re-encryption command. It + * also contains statistics about how far this single batch has been executed. + */ + static final class ReencryptionTask { + private final long zoneId; + private boolean processed = false; + private int numFilesUpdated = 0; + private int numFailures = 0; + private String lastFile = null; + private final ReencryptionBatch batch; + + ReencryptionTask(final long id, final int failures, + final ReencryptionBatch theBatch) { + zoneId = id; + numFailures = failures; + batch = theBatch; + } + } + + /** + * Class that encapsulates re-encryption details of a file. It contains the + * file inode, stores the initial edek of the file, and the new edek + * after re-encryption. + * <p> + * Assumptions are the object initialization happens when dir lock is held, + * and inode is valid and is encrypted during initialization. + * <p> + * Namespace changes may happen during re-encryption, and if inode is changed + * the re-encryption is skipped. + */ + static final class FileEdekInfo { + private final long inodeId; + private final EncryptedKeyVersion existingEdek; + private EncryptedKeyVersion edek = null; + + FileEdekInfo(FSDirectory dir, INodeFile inode) throws IOException { + assert dir.hasReadLock(); + Preconditions.checkNotNull(inode, "INodeFile is null"); + inodeId = inode.getId(); + final FileEncryptionInfo fei = FSDirEncryptionZoneOp + .getFileEncryptionInfo(dir, INodesInPath.fromINode(inode)); + Preconditions.checkNotNull(fei, + "FileEncryptionInfo is null for " + inodeId); + existingEdek = EncryptedKeyVersion + .createForDecryption(fei.getKeyName(), fei.getEzKeyVersionName(), + fei.getIV(), fei.getEncryptedDataEncryptionKey()); + } + + long getInodeId() { + return inodeId; + } + + EncryptedKeyVersion getExistingEdek() { + return existingEdek; + } + + void setEdek(final EncryptedKeyVersion ekv) { + assert ekv != null; + edek = ekv; + } + } + + @VisibleForTesting + synchronized void pauseForTesting() { + shouldPauseForTesting = true; + LOG.info("Pausing re-encrypt updater for testing."); + notify(); + } + + @VisibleForTesting + synchronized void resumeForTesting() { + shouldPauseForTesting = false; + LOG.info("Resuming re-encrypt updater for testing."); + notify(); + } + + @VisibleForTesting + void pauseForTestingAfterNthCheckpoint(final long zoneId, final int count) { + assert pauseAfterNthCheckpoint == 0; + pauseAfterNthCheckpoint = count; + pauseZoneId = zoneId; + } + + private final FSDirectory dir; + private final CompletionService<ReencryptionTask> batchService; + private final ReencryptionHandler handler; + + ReencryptionUpdater(final FSDirectory fsd, + final CompletionService<ReencryptionTask> service, + final ReencryptionHandler rh, final Configuration conf) { + dir = fsd; + batchService = service; + handler = rh; + this.throttleLimitRatio = + conf.getDouble(DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_KEY, + DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_DEFAULT); + Preconditions.checkArgument(throttleLimitRatio > 0.0f, + DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_KEY + + " is not positive."); + } + + /** + * Called by the submission thread to indicate all tasks have been submitted. + * If this is called but no tasks has been submitted, the re-encryption is + * considered complete. + * + * @param zoneId Id of the zone inode. + * @throws IOException + * @throws InterruptedException + */ + void markZoneSubmissionDone(final long zoneId) + throws IOException, InterruptedException { + final ZoneSubmissionTracker tracker = handler.getTracker(zoneId); + if (tracker != null) { + tracker.submissionDone = true; + } else { + // Caller thinks submission is done, but no tasks submitted - meaning + // no files in the EZ need to be re-encrypted. Complete directly. + handler.addDummyTracker(zoneId); + } + } + + @Override + public void run() { + throttleTimerAll.start(); + while (true) { + try { + // Assuming single-threaded updater. + takeAndProcessTasks(); + } catch (InterruptedException ie) { + LOG.warn("Re-encryption updater thread interrupted. Exiting."); + Thread.currentThread().interrupt(); + return; + } catch (IOException ioe) { + LOG.warn("Re-encryption updater thread exception.", ioe); + } catch (Throwable t) { + LOG.error("Re-encryption updater thread exiting.", t); + return; + } + } + } + + /** + * Process a completed ReencryptionTask. Each inode id is resolved to an INode + * object, skip if the inode is deleted. + * <p> + * Only file xattr is updated by this method. Re-encryption progress is not + * updated. + * + * @param zoneNodePath full path of the EZ inode. + * @param task the completed task. + * @throws IOException + * @throws InterruptedException + */ + private void processTaskEntries(final String zoneNodePath, + final ReencryptionTask task) throws IOException, InterruptedException { + assert dir.hasWriteLock(); + if (!task.batch.isEmpty() && task.numFailures == 0) { + LOG.debug( + "Updating file xattrs for re-encrypting zone {}," + " starting at {}", + zoneNodePath, task.batch.getFirstFilePath()); + for (Iterator<FileEdekInfo> it = task.batch.getBatch().iterator(); + it.hasNext();) { + FileEdekInfo entry = it.next(); + // resolve the inode again, and skip if it's doesn't exist + LOG.trace("Updating {} for re-encryption.", entry.getInodeId()); + final INode inode = dir.getInode(entry.getInodeId()); + if (inode == null) { + LOG.debug("INode {} doesn't exist, skipping re-encrypt.", + entry.getInodeId()); + // also remove from batch so later it's not saved. + it.remove(); + continue; + } + + // Cautiously check file encryption info, and only update if we're sure + // it's still using the same edek. + Preconditions.checkNotNull(entry.edek); + final FileEncryptionInfo fei = FSDirEncryptionZoneOp + .getFileEncryptionInfo(dir, INodesInPath.fromINode(inode)); + if (!fei.getKeyName().equals(entry.edek.getEncryptionKeyName())) { + LOG.debug("Inode {} EZ key changed, skipping re-encryption.", + entry.getInodeId()); + it.remove(); + continue; + } + if (fei.getEzKeyVersionName() + .equals(entry.edek.getEncryptionKeyVersionName())) { + LOG.debug( + "Inode {} EZ key version unchanged, skipping re-encryption.", + entry.getInodeId()); + it.remove(); + continue; + } + if (!Arrays.equals(fei.getEncryptedDataEncryptionKey(), + entry.existingEdek.getEncryptedKeyVersion().getMaterial())) { + LOG.debug("Inode {} existing edek changed, skipping re-encryption", + entry.getInodeId()); + it.remove(); + continue; + } + FileEncryptionInfo newFei = new FileEncryptionInfo(fei.getCipherSuite(), + fei.getCryptoProtocolVersion(), + entry.edek.getEncryptedKeyVersion().getMaterial(), + entry.edek.getEncryptedKeyIv(), fei.getKeyName(), + entry.edek.getEncryptionKeyVersionName()); + final INodesInPath iip = INodesInPath.fromINode(inode); + FSDirEncryptionZoneOp + .setFileEncryptionInfo(dir, iip, newFei, XAttrSetFlag.REPLACE); + task.lastFile = iip.getPath(); + ++task.numFilesUpdated; + } + + LOG.info("Updated xattrs on {}({}) files in zone {} for re-encryption," + + " starting:{}.", task.numFilesUpdated, task.batch.size(), + zoneNodePath, task.batch.getFirstFilePath()); + } + task.processed = true; + } + + /** + * Iterate tasks for the given zone, and update progress accordingly. The + * checkpoint indicates all files before it are done re-encryption, so it will + * be updated to the position where all tasks before are completed. + * + * @param zoneNode the EZ inode. + * @param tracker the zone submission tracker. + * @return the list containing the last checkpointed xattr. Empty if + * no checkpoint happened. + * @throws ExecutionException + * @throws IOException + * @throws InterruptedException + */ + private List<XAttr> processCheckpoints(final INode zoneNode, + final ZoneSubmissionTracker tracker) + throws ExecutionException, IOException, InterruptedException { + assert dir.hasWriteLock(); + final long zoneId = zoneNode.getId(); + final String zonePath = zoneNode.getFullPathName(); + final ZoneReencryptionStatus status = + handler.getReencryptionStatus().getZoneStatus(zoneId); + assert status != null; + // always start from the beginning, because the checkpoint means all files + // before it are re-encrypted. + final LinkedList<Future> tasks = tracker.getTasks(); + final List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1); + ListIterator<Future> iter = tasks.listIterator(); + while (iter.hasNext()) { + Future<ReencryptionTask> curr = iter.next(); + if (!curr.isDone() || !curr.get().processed) { + // still has earlier tasks not completed, skip here. + break; + } + ReencryptionTask task = curr.get(); + LOG.debug("Updating re-encryption checkpoint with completed task." + + " last: {} size:{}.", task.lastFile, task.batch.size()); + assert zoneId == task.zoneId; + try { + final XAttr xattr = FSDirEncryptionZoneOp + .updateReencryptionProgress(dir, zoneNode, status, task.lastFile, + task.numFilesUpdated, task.numFailures); + xAttrs.clear(); + xAttrs.add(xattr); + } catch (IOException ie) { + LOG.warn("Failed to update re-encrypted progress to xattr for zone {}", + zonePath, ie); + ++task.numFailures; + } + ++tracker.numCheckpointed; + iter.remove(); + } + if (tracker.isCompleted()) { + LOG.debug("Removed re-encryption tracker for zone {} because it completed" + + " with {} tasks.", zonePath, tracker.numCheckpointed); + return handler.completeReencryption(zoneNode); + } + return xAttrs; + } + + private void takeAndProcessTasks() throws Exception { + final Future<ReencryptionTask> completed = batchService.take(); + throttle(); + checkPauseForTesting(); + ReencryptionTask task = completed.get(); + if (completed.isCancelled()) { + LOG.debug("Skipped canceled re-encryption task for zone {}, last: {}", + task.zoneId, task.lastFile); + } + + boolean shouldRetry; + do { + dir.getFSNamesystem().writeLock(); + try { + throttleTimerLocked.start(); + processTask(task); + shouldRetry = false; + } catch (RetriableException | SafeModeException re) { + // Keep retrying until succeed. + LOG.info("Exception when processing re-encryption task for zone {}, " + + "retrying...", task.zoneId, re); + shouldRetry = true; + Thread.sleep(faultRetryInterval); + } catch (IOException ioe) { + LOG.warn("Failure processing re-encryption task for zone {}", + task.zoneId, ioe); + ++task.numFailures; + task.processed = true; + shouldRetry = false; + } finally { + dir.getFSNamesystem().writeUnlock("reencryptUpdater"); + throttleTimerLocked.stop(); + } + // logSync regardless, to prevent edit log buffer overflow triggering + // logSync inside FSN writelock. + dir.getEditLog().logSync(); + } while (shouldRetry); + } + + private void processTask(ReencryptionTask task) + throws InterruptedException, ExecutionException, IOException { + final List<XAttr> xAttrs; + final String zonePath; + dir.writeLock(); + try { + handler.checkZoneReady(task.zoneId); + final INode zoneNode = dir.getInode(task.zoneId); + if (zoneNode == null) { + // ez removed. + return; + } + zonePath = zoneNode.getFullPathName(); + LOG.info("Processing returned re-encryption task for zone {}({}), " + + "batch size {}, start:{}", zonePath, task.zoneId, + task.batch.size(), task.batch.getFirstFilePath()); + final ZoneSubmissionTracker tracker = + handler.getTracker(zoneNode.getId()); + Preconditions.checkNotNull(tracker, "zone tracker not found " + zonePath); + tracker.numFutureDone++; + EncryptionFaultInjector.getInstance().reencryptUpdaterProcessOneTask(); + processTaskEntries(zonePath, task); + EncryptionFaultInjector.getInstance().reencryptUpdaterProcessCheckpoint(); + xAttrs = processCheckpoints(zoneNode, tracker); + } finally { + dir.writeUnlock(); + } + FSDirEncryptionZoneOp.saveFileXAttrsForBatch(dir, task.batch.getBatch()); + if (!xAttrs.isEmpty()) { + dir.getEditLog().logSetXAttrs(zonePath, xAttrs, false); + } + } + + private synchronized void checkPauseForTesting() throws InterruptedException { + assert !dir.hasWriteLock(); + assert !dir.getFSNamesystem().hasWriteLock(); + if (pauseAfterNthCheckpoint != 0) { + ZoneSubmissionTracker tracker = + handler.unprotectedGetTracker(pauseZoneId); + if (tracker != null) { + if (tracker.numFutureDone == pauseAfterNthCheckpoint) { + shouldPauseForTesting = true; + pauseAfterNthCheckpoint = 0; + } + } + } + while (shouldPauseForTesting) { + LOG.info("Sleeping in the re-encryption updater for unit test."); + wait(); + LOG.info("Continuing re-encryption updater after pausing."); + } + } + + /** + * Throttles the ReencryptionUpdater to prevent from contending FSN/FSD write + * locks. This is done by the configuration. + */ + private void throttle() throws InterruptedException { + if (throttleLimitRatio >= 1.0) { + return; + } + + final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS) + * throttleLimitRatio); + final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS); + if (LOG.isDebugEnabled()) { + LOG.debug("Re-encryption updater throttling expect: {}, actual: {}," + + " throttleTimerAll:{}", expect, actual, + throttleTimerAll.now(TimeUnit.MILLISECONDS)); + } + if (expect - actual < 0) { + // in case throttleLimitHandlerRatio is very small, expect will be 0. + // so sleepMs should not be calculated from expect, to really meet the + // ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs + // should be 1000 - throttleTimerAll.now() + final long sleepMs = + (long) (actual / throttleLimitRatio) - throttleTimerAll + .now(TimeUnit.MILLISECONDS); + LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs); + Thread.sleep(sleepMs); + } + throttleTimerAll.reset().start(); + throttleTimerLocked.reset(); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org