HDFS-13731. ReencryptionUpdater fails with ConcurrentModificationException during processCheckpoints. Contributed by Zsolt Venczel.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3e18b957 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3e18b957 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3e18b957 Branch: refs/heads/HDFS-12090 Commit: 3e18b957ebdf20925224ab9c28e6c2f4b6bbdb24 Parents: c5629d5 Author: Zsolt Venczel <zvenc...@cloudera.com> Authored: Tue Aug 28 15:11:58 2018 -0700 Committer: Xiao Chen <x...@apache.org> Committed: Tue Aug 28 15:13:43 2018 -0700 ---------------------------------------------------------------------- .../server/namenode/ReencryptionHandler.java | 6 +-- .../server/namenode/ReencryptionUpdater.java | 52 ++++++++++---------- 2 files changed, 30 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e18b957/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java index c8c8d68..a8acccd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java @@ -714,10 +714,10 @@ public class ReencryptionHandler implements Runnable { zst = new ZoneSubmissionTracker(); submissions.put(zoneId, zst); } + Future future = batchService.submit(new EDEKReencryptCallable(zoneId, + currentBatch, reencryptionHandler)); + zst.addTask(future); } - Future future = batchService.submit(new EDEKReencryptCallable(zoneId, - currentBatch, reencryptionHandler)); - zst.addTask(future); LOG.info("Submitted batch (start:{}, size:{}) of zone {} to re-encrypt.", currentBatch.getFirstFilePath(), currentBatch.size(), zoneId); currentBatch = new ReencryptionBatch(reencryptBatchSize); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e18b957/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java index a5923a7..15cfa92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java @@ -383,32 +383,34 @@ public final class ReencryptionUpdater implements Runnable { final LinkedList<Future> tasks = tracker.getTasks(); final List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1); ListIterator<Future> iter = tasks.listIterator(); - while (iter.hasNext()) { - Future<ReencryptionTask> curr = iter.next(); - if (curr.isCancelled()) { - break; - } - if (!curr.isDone() || !curr.get().processed) { - // still has earlier tasks not completed, skip here. - break; - } - ReencryptionTask task = curr.get(); - LOG.debug("Updating re-encryption checkpoint with completed task." - + " last: {} size:{}.", task.lastFile, task.batch.size()); - assert zoneId == task.zoneId; - try { - final XAttr xattr = FSDirEncryptionZoneOp - .updateReencryptionProgress(dir, zoneNode, status, task.lastFile, - task.numFilesUpdated, task.numFailures); - xAttrs.clear(); - xAttrs.add(xattr); - } catch (IOException ie) { - LOG.warn("Failed to update re-encrypted progress to xattr for zone {}", - zonePath, ie); - ++task.numFailures; + synchronized (handler) { + while (iter.hasNext()) { + Future<ReencryptionTask> curr = iter.next(); + if (curr.isCancelled()) { + break; + } + if (!curr.isDone() || !curr.get().processed) { + // still has earlier tasks not completed, skip here. + break; + } + ReencryptionTask task = curr.get(); + LOG.debug("Updating re-encryption checkpoint with completed task." + + " last: {} size:{}.", task.lastFile, task.batch.size()); + assert zoneId == task.zoneId; + try { + final XAttr xattr = FSDirEncryptionZoneOp + .updateReencryptionProgress(dir, zoneNode, status, task.lastFile, + task.numFilesUpdated, task.numFailures); + xAttrs.clear(); + xAttrs.add(xattr); + } catch (IOException ie) { + LOG.warn("Failed to update re-encrypted progress to xattr" + + " for zone {}", zonePath, ie); + ++task.numFailures; + } + ++tracker.numCheckpointed; + iter.remove(); } - ++tracker.numCheckpointed; - iter.remove(); } if (tracker.isCompleted()) { LOG.debug("Removed re-encryption tracker for zone {} because it completed" --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org