This is an automated email from the ASF dual-hosted git repository. shv pushed a commit to branch fgl in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit d6efb60f498f81befc2e88c6f9f1929dd765e951 Author: Xing Lin <xing...@linkedin.com> AuthorDate: Fri Jul 16 13:04:59 2021 -0700 HDFS-16125. [FGL] Fix the iterator for PartitionedGSet. Contributed by Xing Lin. (#3197) --- .../java/org/apache/hadoop/util/LatchLock.java | 4 +- .../org/apache/hadoop/util/PartitionedGSet.java | 35 ++- .../apache/hadoop/util/TestPartitionedGSet.java | 270 +++++++++++++++++++++ .../hadoop/hdfs/server/namenode/INodeMap.java | 4 +- 4 files changed, 300 insertions(+), 13 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java index 41e33da..fd98391 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java @@ -30,7 +30,7 @@ public abstract class LatchLock<C> { protected abstract boolean isReadTopLocked(); /** @return true topLock is locked for write by any thread */ protected abstract boolean isWriteTopLocked(); - protected abstract void readTopdUnlock(); + protected abstract void readTopUnlock(); protected abstract void writeTopUnlock(); protected abstract boolean hasReadChildLock(); @@ -46,7 +46,7 @@ public abstract class LatchLock<C> { // Public APIs to use with the class public void readLock() { readChildLock(); - readTopdUnlock(); + readTopUnlock(); } public void readUnlock() { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java index 7ebb1b3..f3569cc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java @@ -24,7 +24,7 @@ import java.util.Map.Entry; import java.util.NavigableMap; import java.util.Set; import java.util.TreeMap; - +import java.util.NoSuchElementException; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.util.LightWeightGSet.LinkedElement; @@ -79,8 +79,7 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> { public PartitionedGSet(final int capacity, final Comparator<? super K> comparator, - final LatchLock<?> latchLock, - final E rootKey) { + final LatchLock<?> latchLock) { this.partitions = new TreeMap<K, PartitionEntry>(comparator); this.latchLock = latchLock; // addNewPartition(rootKey).put(rootKey); @@ -275,17 +274,36 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> { * modifying other partitions, while iterating through the current one. */ private class EntryIterator implements Iterator<E> { - private final Iterator<K> keyIterator; + private Iterator<K> keyIterator; private Iterator<E> partitionIterator; public EntryIterator() { keyIterator = partitions.keySet().iterator(); - K curKey = partitions.firstKey(); - partitionIterator = getPartition(curKey).iterator(); + + if (!keyIterator.hasNext()) { + partitionIterator = null; + return; + } + + K firstKey = keyIterator.next(); + partitionIterator = partitions.get(firstKey).iterator(); } @Override public boolean hasNext() { + + // Special case: an iterator was created for an empty PartitionedGSet. + // Check whether new partitions have been added since then. + if (partitionIterator == null) { + if (partitions.size() == 0) { + return false; + } else { + keyIterator = partitions.keySet().iterator(); + K nextKey = keyIterator.next(); + partitionIterator = partitions.get(nextKey).iterator(); + } + } + while(!partitionIterator.hasNext()) { if(!keyIterator.hasNext()) { return false; @@ -298,9 +316,8 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> { @Override public E next() { - while(!partitionIterator.hasNext()) { - K curKey = keyIterator.next(); - partitionIterator = getPartition(curKey).iterator(); + if (!hasNext()) { + throw new NoSuchElementException("No more elements in this set."); } return partitionIterator.next(); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestPartitionedGSet.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestPartitionedGSet.java new file mode 100644 index 0000000..9ae772c --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestPartitionedGSet.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Random; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.hadoop.util.LightWeightGSet.LinkedElement; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Testing {@link PartitionedGSet} */ +public class TestPartitionedGSet { + public static final Logger LOG = + LoggerFactory.getLogger(TestPartitionedGSet.class); + private static final int ELEMENT_NUM = 100; + + /** + * Generate positive random numbers for testing. We want to use only positive + * numbers because the smallest partition used in testing is 0. + * + * @param length + * number of random numbers to be generated. + * + * @param randomSeed + * seed to be used for random number generator. + * + * @return + * An array of Integers + */ + private static ArrayList<Integer> getRandomList(int length, int randomSeed) { + Random random = new Random(randomSeed); + ArrayList<Integer> list = new ArrayList<Integer>(length); + for (int i = 0; i < length; i++) { + list.add(random.nextInt(Integer.MAX_VALUE)); + } + return list; + } + + private static class TestElement implements LinkedElement { + private final int val; + private LinkedElement next; + + TestElement(int val) { + this.val = val; + this.next = null; + } + + public int getVal() { + return val; + } + + @Override + public void setNext(LinkedElement next) { + this.next = next; + } + + @Override + public LinkedElement getNext() { + return next; + } + } + + private static class TestElementComparator implements Comparator<TestElement> + { + @Override + public int compare(TestElement e1, TestElement e2) { + if (e1 == null || e2 == null) { + throw new NullPointerException("Cannot compare null elements"); + } + + return e1.getVal() - e2.getVal(); + } + } + + protected ReentrantReadWriteLock topLock = + new ReentrantReadWriteLock(false); + /** + * We are NOT testing any concurrent access to a PartitionedGSet here. + */ + private class NoOpLock extends LatchLock<ReentrantReadWriteLock> { + private ReentrantReadWriteLock childLock; + + public NoOpLock() { + childLock = new ReentrantReadWriteLock(false); + } + + @Override + protected boolean isReadTopLocked() { + return topLock.getReadLockCount() > 0 || isWriteTopLocked(); + } + + @Override + protected boolean isWriteTopLocked() { + return topLock.isWriteLocked(); + } + + @Override + protected void readTopUnlock() { + topLock.readLock().unlock(); + } + + @Override + protected void writeTopUnlock() { + topLock.writeLock().unlock(); + } + + @Override + protected boolean hasReadChildLock() { + return childLock.getReadLockCount() > 0 || hasWriteChildLock(); + } + + @Override + protected void readChildLock() { + childLock.readLock().lock(); + } + + @Override + protected void readChildUnlock() { + childLock.readLock().unlock(); + } + + @Override + protected boolean hasWriteChildLock() { + return childLock.isWriteLockedByCurrentThread(); + } + + @Override + protected void writeChildLock() { + childLock.writeLock().lock(); + } + + @Override + protected void writeChildUnlock() { + childLock.writeLock().unlock(); + } + + @Override + protected LatchLock<ReentrantReadWriteLock> clone() { + return new NoOpLock(); + } + } + + /** + * Test iterator for a PartitionedGSet with no partitions. + */ + @Test(timeout=60000) + public void testIteratorForNoPartition() { + PartitionedGSet<TestElement, TestElement> set = + new PartitionedGSet<TestElement, TestElement>( + 16, new TestElementComparator(), new NoOpLock()); + + topLock.readLock().lock(); + int count = 0; + Iterator<TestElement> iter = set.iterator(); + while( iter.hasNext() ) { + iter.next(); + count ++; + } + topLock.readLock().unlock(); + Assert.assertEquals(0, count); + } + + /** + * Test iterator for a PartitionedGSet with empty partitions. + */ + @Test(timeout=60000) + public void testIteratorForEmptyPartitions() { + PartitionedGSet<TestElement, TestElement> set = + new PartitionedGSet<TestElement, TestElement>( + 16, new TestElementComparator(), new NoOpLock()); + + set.addNewPartition(new TestElement(0)); + set.addNewPartition(new TestElement(1000)); + set.addNewPartition(new TestElement(2000)); + + topLock.readLock().lock(); + int count = 0; + Iterator<TestElement> iter = set.iterator(); + while( iter.hasNext() ) { + iter.next(); + count ++; + } + topLock.readLock().unlock(); + Assert.assertEquals(0, count); + } + + /** + * Test whether the iterator can return the same number of elements as stored + * into the PartitionedGSet. + */ + @Test(timeout=60000) + public void testIteratorCountElements() { + ArrayList<Integer> list = getRandomList(ELEMENT_NUM, 123); + PartitionedGSet<TestElement, TestElement> set = + new PartitionedGSet<TestElement, TestElement>( + 16, new TestElementComparator(), new NoOpLock()); + + set.addNewPartition(new TestElement(0)); + set.addNewPartition(new TestElement(1000)); + set.addNewPartition(new TestElement(2000)); + + topLock.writeLock().lock(); + for (Integer i : list) { + set.put(new TestElement(i)); + } + topLock.writeLock().unlock(); + + topLock.readLock().lock(); + int count = 0; + Iterator<TestElement> iter = set.iterator(); + while( iter.hasNext() ) { + iter.next(); + count ++; + } + topLock.readLock().unlock(); + Assert.assertEquals(ELEMENT_NUM, count); + } + + /** + * Test iterator when it is created before partitions/elements are + * added to the PartitionedGSet. + */ + @Test(timeout=60000) + public void testIteratorAddElementsAfterIteratorCreation() { + PartitionedGSet<TestElement, TestElement> set = + new PartitionedGSet<TestElement, TestElement>( + 16, new TestElementComparator(), new NoOpLock()); + + // Create the iterator before partitions are added. + Iterator<TestElement> iter = set.iterator(); + + set.addNewPartition(new TestElement(0)); + set.addNewPartition(new TestElement(1000)); + set.addNewPartition(new TestElement(2000)); + + // Added one element + topLock.writeLock().lock(); + set.put(new TestElement(2500)); + topLock.writeLock().unlock(); + + topLock.readLock().lock(); + int count = 0; + while( iter.hasNext() ) { + iter.next(); + count ++; + } + topLock.readLock().unlock(); + Assert.assertEquals(1, count); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java index 3b07dce..a0253b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java @@ -121,7 +121,7 @@ public class INodeMap { } @Override - protected void readTopdUnlock() { + protected void readTopUnlock() { namesystem.getFSLock().readUnlock("INodeMap", null, false); } @@ -194,7 +194,7 @@ public class INodeMap { // Compute the map capacity by allocating 1% of total memory int capacity = LightWeightGSet.computeCapacity(1, "INodeMap"); this.map = new PartitionedGSet<>(capacity, new INodeKeyComparator(), - new INodeMapLock(), rootDir); + new INodeMapLock()); // Pre-populate initial empty partitions PartitionedGSet<INode, INodeWithAdditionalFields> pgs = --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org