This is an automated email from the ASF dual-hosted git repository.
hemant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 6cdc379583 HDDS-9710. Missing snapshot entries list Snapshot under a
bucket API (#5619)
6cdc379583 is described below
commit 6cdc3795836c115858130ea540d17a7bdf976723
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Wed Dec 13 15:48:59 2023 +0530
HDDS-9710. Missing snapshot entries list Snapshot under a bucket API (#5619)
---
.../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 4 +
.../org/apache/hadoop/ozone/om/ListIterator.java | 340 +++++++++++++++++
.../hadoop/ozone/om/OmMetadataManagerImpl.java | 74 +---
.../hadoop/ozone/om/OzoneListStatusHelper.java | 421 +++------------------
.../hadoop/ozone/om/TestOmMetadataManager.java | 24 +-
.../ozone/om/request/OMRequestTestUtils.java | 6 +-
6 files changed, 438 insertions(+), 431 deletions(-)
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 63602aa287..7e4cfa0086 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -1432,6 +1432,10 @@ public class KeyManagerImpl implements KeyManager {
&& omKeyInfoCacheValue.getCacheValue() == null;
}
+ public static boolean isKeyInCache(String key, Table keyTable) {
+ return keyTable.getCacheValue(new CacheKey(key)) != null;
+ }
+
/**
* Helper function for listStatus to find key in TableCache.
*/
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ListIterator.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ListIterator.java
new file mode 100644
index 0000000000..0b2639e16b
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ListIterator.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.hdds.utils.db.CopyObject;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
+import java.util.TreeMap;
+
+import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+
+/**
+ * Common class to do listing of resources after merging
+ * rocksDB table cache & actual table.
+ */
+public class ListIterator {
+
+ /**
+ * Interface for iteration of Heap Entries.
+ */
+ public interface ClosableIterator extends Iterator<HeapEntry>, Closeable {
+
+ }
+
+ /**
+ * Entry to be added to the heap.
+ */
+ public static class HeapEntry implements Comparable<HeapEntry> {
+ private final int entryIteratorId;
+ private final String tableName;
+ private final String key;
+ private final Object value;
+
+ HeapEntry(int entryIteratorId, String tableName, String key,
+ Object value) {
+ this.entryIteratorId = entryIteratorId;
+ this.tableName = tableName;
+ this.key = key;
+ this.value = value;
+ }
+
+ public String getKey() {
+ return this.key;
+ }
+
+ private int getEntryIteratorId() {
+ return this.entryIteratorId;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public Object getValue() {
+ return value;
+ }
+
+ public int compareTo(HeapEntry other) {
+ return Comparator.comparing(HeapEntry::getKey)
+ .thenComparing(HeapEntry::getEntryIteratorId).compare(this, other);
+ }
+
+ public boolean equals(Object other) {
+
+ if (!(other instanceof HeapEntry)) {
+ return false;
+ }
+
+
+ HeapEntry that = (HeapEntry) other;
+ return this.compareTo(that) == 0;
+ }
+
+ public int hashCode() {
+ return key.hashCode();
+ }
+ }
+
+ /**
+ * Iterator for DB entries from a given rocksDB table.
+ */
+ public static class DbTableIter<Value> implements
+ ClosableIterator {
+ private final int entryIteratorId;
+ private final TableIterator<String,
+ ? extends Table.KeyValue<String, Value>> tableIterator;
+
+ private final Table<String, Value> table;
+ private HeapEntry currentKey;
+
+ DbTableIter(int entryIteratorId, Table<String, Value> table,
+ String prefixKey, String startKey) throws IOException {
+ this.entryIteratorId = entryIteratorId;
+ this.table = table;
+ this.tableIterator = table.iterator(prefixKey);
+ this.currentKey = null;
+
+ // only seek for the start key if the start key is lexicographically
+ // after the prefix key. For example
+ // Prefix key = 1024/c, Start key = 1024/a
+ // then do not seek for the start key
+ //
+ // on the other hand,
+ // Prefix key = 1024/a, Start key = 1024/c
+ // then seek for the start key
+ if (!StringUtils.isBlank(startKey) &&
+ startKey.compareTo(prefixKey) > 0) {
+ tableIterator.seek(startKey);
+ }
+ }
+
+ private void getNextKey() throws IOException {
+ while (tableIterator.hasNext() && currentKey == null) {
+ Table.KeyValue<String, Value> entry = tableIterator.next();
+ String entryKey = entry.getKey();
+ if (!KeyManagerImpl.isKeyInCache(entryKey, table)) {
+ currentKey = new HeapEntry(entryIteratorId,
+ table.getName(), entryKey, entry.getValue());
+ }
+ }
+ }
+
+ public boolean hasNext() {
+ try {
+ getNextKey();
+ } catch (IOException t) {
+ throw new UncheckedIOException(t);
+ }
+ return currentKey != null;
+ }
+
+ public HeapEntry next() {
+ if (hasNext()) {
+ HeapEntry ret = currentKey;
+ currentKey = null;
+ return ret;
+ }
+ throw new NoSuchElementException();
+ }
+
+ public void close() throws IOException {
+ tableIterator.close();
+ }
+ }
+
+ /**
+ * Iterator for Cache entries in a Dir and File Table.
+ */
+ public static class CacheIter<Value>
+ implements ClosableIterator {
+ private final Map<String, Value> cacheKeyMap;
+
+ private final Iterator<Map.Entry<String, Value>>
+ cacheCreatedKeyIter;
+ private final String prefixKey;
+ private final String startKey;
+ private final String tableName;
+
+ private final int entryIteratorId;
+
+ CacheIter(int entryIteratorId, String tableName,
+ Iterator<Map.Entry<CacheKey<String>,
+ CacheValue<Value>>> cacheIter, String startKey,
+ String prefixKey) {
+ this.cacheKeyMap = new TreeMap<>();
+
+ this.startKey = startKey;
+ this.prefixKey = prefixKey;
+ this.tableName = tableName;
+ this.entryIteratorId = entryIteratorId;
+
+ populateCacheMap(cacheIter);
+
+ cacheCreatedKeyIter = cacheKeyMap.entrySet().iterator();
+ }
+
+ private void populateCacheMap(Iterator<Map.Entry<CacheKey<String>,
+ CacheValue<Value>>> cacheIter) {
+ while (cacheIter.hasNext()) {
+ Map.Entry<CacheKey<String>, CacheValue<Value>> entry =
+ cacheIter.next();
+ String cacheKey = entry.getKey().getCacheKey();
+ Value cacheOmInfo = entry.getValue().getCacheValue();
+
+ // Copy cache value to local copy and work on it
+ if (cacheOmInfo instanceof CopyObject) {
+ cacheOmInfo = ((CopyObject<Value>) cacheOmInfo).copyObject();
+ }
+ if (StringUtils.isBlank(startKey)) {
+ // startKey is null or empty, then the seekKeyInDB="1024/"
+ if (cacheKey.startsWith(prefixKey)) {
+ cacheKeyMap.put(cacheKey, cacheOmInfo);
+ }
+ } else {
+ // startKey not empty, then the seekKeyInDB="1024/b" and
+ // seekKeyInDBWithOnlyParentID = "1024/". This is to avoid case of
+ // parentID with "102444" cache entries.
+ // Here, it has to list all the keys after "1024/b" and requires >=0
+ // string comparison.
+ if (cacheKey.startsWith(prefixKey) &&
+ cacheKey.compareTo(startKey) >= 0) {
+ cacheKeyMap.put(cacheKey, cacheOmInfo);
+ }
+ }
+ }
+ }
+
+ public boolean hasNext() {
+ return cacheCreatedKeyIter.hasNext();
+ }
+
+ public HeapEntry next() {
+ Map.Entry<String, Value> entry = cacheCreatedKeyIter.next();
+ return new HeapEntry(this.entryIteratorId, this.tableName,
+ entry.getKey(), entry.getValue());
+ }
+
+ public void close() {
+ // Nothing to close here
+ }
+ }
+
+ /**
+ * Implement lexicographical sorting of the file status by sorting file
status
+ * across multiple lists. Each of these lists are sorted internally.
+ *
+ * This class implements sorted output by implementing a min heap based
+ * iterator where the initial element from each of sorted list is inserted.
+ *
+ * The least entry is removed and the next entry from the same list from
+ * which the entry is removed is added into the list.
+ *
+ * For example
+ * RawDir - a1, a3, a5, a7
+ * RawFile - a2, a4, a6, a8
+ *
+ * Min Heap is initially composed of {(a1, RawDir), (a2, RawFile)}
+ * The least element is removed i.e a1 and then next entry from RawDir
+ * is inserted into minheap resulting in {(a2, RawFile), (a3, RawDir)}
+ *
+ * This process is repeated till both the lists are exhausted.
+ */
+ public static class MinHeapIterator implements ClosableIterator {
+ private final PriorityQueue<HeapEntry> minHeap = new PriorityQueue<>();
+ private final ArrayList<ClosableIterator> iterators = new ArrayList<>();
+
+ MinHeapIterator(OMMetadataManager omMetadataManager, String prefixKey,
+ BucketLayout bucketLayout, String startKey,
+ String volumeName, String bucketName) throws IOException {
+
+ this(omMetadataManager, prefixKey, startKey, volumeName,
+ bucketName, omMetadataManager.getDirectoryTable(),
+ omMetadataManager.getKeyTable(bucketLayout));
+ }
+
+ MinHeapIterator(OMMetadataManager omMetadataManager, String prefixKey,
+ String startKey, String volumeName, String bucketName,
+ Table... tables) throws IOException {
+ omMetadataManager.getLock().acquireReadLock(BUCKET_LOCK, volumeName,
+ bucketName);
+ try {
+ int iteratorId = 0;
+ for (Table table : tables) {
+ iterators.add(new CacheIter<>(iteratorId, table.getName(),
+ table.cacheIterator(), startKey, prefixKey));
+ iteratorId++;
+ iterators.add(new DbTableIter<>(iteratorId, table, prefixKey,
+ startKey));
+ iteratorId++;
+ }
+ } finally {
+ omMetadataManager.getLock().releaseReadLock(BUCKET_LOCK, volumeName,
+ bucketName);
+ }
+
+ // Insert the element from each of the iterator
+ for (Iterator<HeapEntry> iter : iterators) {
+ try {
+ if (iter.hasNext()) {
+ minHeap.add(iter.next());
+ }
+ } catch (UncheckedIOException e) {
+ throw e.getCause();
+ }
+ }
+
+ }
+
+ public boolean hasNext() {
+ return !minHeap.isEmpty();
+ }
+
+ public HeapEntry next() {
+ HeapEntry heapEntry = minHeap.remove();
+ // remove the least element and
+ // reinsert the next element from the same iterator
+ Iterator<HeapEntry> iter = iterators.get(heapEntry.getEntryIteratorId());
+ if (iter.hasNext()) {
+ minHeap.add(iter.next());
+ }
+
+ return heapEntry;
+ }
+
+ public void close() throws IOException {
+ iterators.forEach(IOUtils::closeQuietly);
+ }
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 031a98baad..e272cb8692 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.om;
import java.io.File;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
@@ -28,6 +29,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@@ -1364,66 +1366,26 @@ public class OmMetadataManagerImpl implements
OMMetadataManager,
snapshotPrefix) ? snapshotPrefix : OM_KEY_PREFIX);
}
- TreeMap<String, SnapshotInfo> snapshotInfoMap = new TreeMap<>();
-
- int count = appendSnapshotFromCacheToMap(
- snapshotInfoMap, prefix, seek, maxListResult);
- appendSnapshotFromDBToMap(
- snapshotInfoMap, prefix, seek, count, maxListResult);
-
- return new ArrayList<>(snapshotInfoMap.values());
- }
-
- private int appendSnapshotFromCacheToMap(
- TreeMap snapshotInfoMap, String prefix,
- String previous, int maxListResult) {
- int count = 0;
- Iterator<Map.Entry<CacheKey<String>, CacheValue<SnapshotInfo>>> iterator =
- snapshotInfoTable.cacheIterator();
- while (iterator.hasNext() && count < maxListResult) {
- Map.Entry<CacheKey<String>, CacheValue<SnapshotInfo>> entry =
- iterator.next();
- String snapshotKey = entry.getKey().getCacheKey();
- SnapshotInfo snapshotInfo = entry.getValue().getCacheValue();
- if (snapshotInfo != null && snapshotKey.startsWith(prefix) &&
- snapshotKey.compareTo(previous) > 0) {
- snapshotInfoMap.put(snapshotKey, snapshotInfo);
- count++;
- }
- }
- return count;
- }
-
- private void appendSnapshotFromDBToMap(TreeMap snapshotInfoMap,
- String prefix, String previous,
- int count, int maxListResult)
- throws IOException {
- try (TableIterator<String, ? extends KeyValue<String, SnapshotInfo>>
- snapshotIter = snapshotInfoTable.iterator()) {
- KeyValue<String, SnapshotInfo> snapshotinfo;
- snapshotIter.seek(previous);
- while (snapshotIter.hasNext() && count < maxListResult) {
- snapshotinfo = snapshotIter.next();
- if (snapshotinfo != null &&
- snapshotinfo.getKey().compareTo(previous) == 0) {
- continue;
- }
- if (snapshotinfo != null && snapshotinfo.getKey().startsWith(prefix))
{
- CacheValue<SnapshotInfo> cacheValue =
- snapshotInfoTable.getCacheValue(
- new CacheKey<>(snapshotinfo.getKey()));
- // There is always the latest data in the cache, so don't need to add
- // earlier data from DB. We only add data from DB if there is no data
- // in cache.
- if (cacheValue == null) {
- snapshotInfoMap.put(snapshotinfo.getKey(),
snapshotinfo.getValue());
- count++;
+ List<SnapshotInfo> snapshotInfos = Lists.newArrayList();
+ try (ListIterator.MinHeapIterator snapshotIterator =
+ new ListIterator.MinHeapIterator(this, prefix, seek, volumeName,
+ bucketName, snapshotInfoTable)) {
+ try {
+ while (snapshotIterator.hasNext() && maxListResult > 0) {
+ SnapshotInfo snapshotInfo = (SnapshotInfo) snapshotIterator.next()
+ .getValue();
+ if (!snapshotInfo.getName().equals(prevSnapshot)) {
+ snapshotInfos.add(snapshotInfo);
+ maxListResult--;
}
- } else {
- break;
}
+ } catch (NoSuchElementException e) {
+ throw new IOException(e);
+ } catch (UncheckedIOException e) {
+ throw e.getCause();
}
}
+ return snapshotInfos;
}
@Override
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneListStatusHelper.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneListStatusHelper.java
index 4bfe888d43..9735ea209d 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneListStatusHelper.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneListStatusHelper.java
@@ -21,34 +21,25 @@ import com.google.common.base.Strings;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
-import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
-import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
-import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
-import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
-import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
-import org.apache.hadoop.hdds.utils.db.CopyObject;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
-import org.apache.hadoop.ozone.om.helpers.WithParentObjectId;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.TreeMap;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.NoSuchElementException;
import java.util.Collection;
import java.util.Collections;
import java.util.stream.Collectors;
@@ -56,8 +47,6 @@ import java.util.stream.Collectors;
import static org.apache.hadoop.ozone.om.exceptions.OMException.
ResultCodes.FILE_NOT_FOUND;
-import static org.apache.hadoop.ozone.om.lock.
- OzoneManagerLock.Resource.BUCKET_LOCK;
/**
* Helper class for fetching List Status for a path.
@@ -72,13 +61,6 @@ public class OzoneListStatusHelper {
boolean skipFileNotFoundError) throws IOException;
}
- /**
- * Interface for iteration of Heap Entries.
- */
- public interface ClosableIterator extends Iterator<HeapEntry>, Closeable {
-
- }
-
private static final Logger LOG =
LoggerFactory.getLogger(OzoneListStatusHelper.class);
@@ -208,18 +190,25 @@ public class OzoneListStatusHelper {
// fetch the sorted output using a min heap iterator where
// every remove from the heap will give the smallest entry.
- try (MinHeapIterator heapIterator = new MinHeapIterator(metadataManager,
- dbPrefixKey, bucketLayout, startKeyPrefix, volumeName, bucketName)) {
+ try (ListIterator.MinHeapIterator heapIterator =
+ new ListIterator.MinHeapIterator(metadataManager, dbPrefixKey,
+ bucketLayout, startKeyPrefix, volumeName, bucketName)) {
- while (map.size() < numEntries && heapIterator.hasNext()) {
- HeapEntry entry = heapIterator.next();
- OzoneFileStatus status = entry.getStatus(prefixKey,
- scmBlockSize, volumeName, bucketName, replication);
- // Caution: DO NOT use putIfAbsent. putIfAbsent undesirably overwrites
- // the value with `status` when the existing value in the map is null.
- if (!map.containsKey(entry.key)) {
- map.put(entry.key, status);
+ try {
+ while (map.size() < numEntries && heapIterator.hasNext()) {
+ ListIterator.HeapEntry entry = heapIterator.next();
+ OzoneFileStatus status = getStatus(prefixKey,
+ scmBlockSize, volumeName, bucketName, replication, entry);
+ // Caution: DO NOT use putIfAbsent. putIfAbsent undesirably
overwrites
+ // the value with `status` when the existing value in the map is
null.
+ if (!map.containsKey(entry.getKey())) {
+ map.put(entry.getKey(), status);
+ }
}
+ } catch (NoSuchElementException e) {
+ throw new IOException(e);
+ } catch (UncheckedIOException e) {
+ throw e.getCause();
}
}
@@ -227,6 +216,36 @@ public class OzoneListStatusHelper {
Collectors.toList());
}
+ private OzoneFileStatus getStatus(String prefixPath, long scmBlockSz,
+ String volumeName, String bucketName,
+ ReplicationConfig bucketReplication,
+ ListIterator.HeapEntry entry) {
+ if (entry == null || entry.getValue() == null) {
+ return null;
+ }
+ Object value = entry.getValue();
+ boolean isDir =
+ OmMetadataManagerImpl.DIRECTORY_TABLE.equals(entry.getTableName());
+ OmKeyInfo keyInfo;
+ if (isDir) {
+ Preconditions.checkArgument(value instanceof OmDirectoryInfo);
+ OmDirectoryInfo dirInfo = (OmDirectoryInfo) value;
+ String dirName = OMFileRequest.getAbsolutePath(prefixPath,
+ dirInfo.getName());
+ keyInfo = OMFileRequest.getOmKeyInfo(volumeName,
+ bucketName, dirInfo, dirName);
+ keyInfo.setReplicationConfig(bucketReplication); // always overwrite
+ } else {
+ Preconditions.checkArgument(value instanceof OmKeyInfo);
+ keyInfo = (OmKeyInfo) value;
+ keyInfo.setFileName(keyInfo.getKeyName());
+ String fullKeyPath = OMFileRequest.getAbsolutePath(prefixPath,
+ keyInfo.getKeyName());
+ keyInfo.setKeyName(fullKeyPath);
+ }
+ return new OzoneFileStatus(keyInfo, scmBlockSz, isDir);
+ }
+
private String getDbKey(String key, OmKeyArgs args,
OmVolumeArgs volumeInfo,
OmBucketInfo omBucketInfo) throws IOException {
@@ -259,334 +278,4 @@ public class OzoneListStatusHelper {
}
}
- /**
- * Enum of types of entries in the heap.
- */
- public enum EntryType {
- DIR_CACHE,
- FILE_CACHE,
- RAW_DIR_DB,
- RAW_FILE_DB;
-
- public boolean isDir() {
- switch (this) {
- case DIR_CACHE:
- case RAW_DIR_DB:
- return true;
- case FILE_CACHE:
- case RAW_FILE_DB:
- return false;
- default:
- throw new IllegalArgumentException();
- }
- }
- }
-
- /**
- * Entry to be added to the heap.
- */
- private static class HeapEntry implements Comparable<HeapEntry> {
- private final EntryType entryType;
- private final String key;
- private final Object value;
-
- HeapEntry(EntryType entryType, String key, Object value) {
- Preconditions.checkArgument(value == null ||
- value instanceof OmDirectoryInfo ||
- value instanceof OmKeyInfo);
- this.entryType = entryType;
- this.key = key;
- this.value = value;
- }
-
- public int compareTo(HeapEntry other) {
- return this.key.compareTo(other.key);
- }
-
- public boolean equals(Object other) {
- if (other == null) {
- return false;
- }
-
- if (!(other instanceof HeapEntry)) {
- return false;
- }
-
-
- HeapEntry that = (HeapEntry) other;
- return this.key.equals(that.key);
- }
-
- public int hashCode() {
- return key.hashCode();
- }
-
- public OzoneFileStatus getStatus(
- String prefixPath,
- long scmBlockSize,
- String volumeName,
- String bucketName,
- ReplicationConfig bucketReplication
- ) {
- if (value == null) {
- return null;
- }
- OmKeyInfo keyInfo;
- if (entryType.isDir()) {
- Preconditions.checkArgument(value instanceof OmDirectoryInfo);
- OmDirectoryInfo dirInfo = (OmDirectoryInfo) value;
- String dirName = OMFileRequest.getAbsolutePath(prefixPath,
- dirInfo.getName());
- keyInfo = OMFileRequest.getOmKeyInfo(volumeName,
- bucketName, dirInfo, dirName);
- keyInfo.setReplicationConfig(bucketReplication); // always overwrite
- } else {
- Preconditions.checkArgument(value instanceof OmKeyInfo);
- keyInfo = (OmKeyInfo) value;
- keyInfo.setFileName(keyInfo.getKeyName());
- String fullKeyPath = OMFileRequest.getAbsolutePath(prefixPath,
- keyInfo.getKeyName());
- keyInfo.setKeyName(fullKeyPath);
- }
- return new OzoneFileStatus(keyInfo, scmBlockSize, entryType.isDir());
- }
- }
-
- /**
- * Iterator for DB entries in a Dir and File Table.
- */
- private static class RawIter<Value> implements ClosableIterator {
- private final EntryType iterType;
- private final String prefixKey;
- private final TableIterator<String,
- ? extends Table.KeyValue<String, Value>> tableIterator;
-
- private final Table<String, Value> table;
- private HeapEntry currentKey;
-
- RawIter(EntryType iterType, Table<String, Value> table,
- String prefixKey, String startKey) throws IOException {
- this.iterType = iterType;
- this.table = table;
- this.tableIterator = table.iterator(prefixKey);
- this.prefixKey = prefixKey;
- this.currentKey = null;
-
- // only seek for the start key if the start key is lexicographically
- // after the prefix key. For example
- // Prefix key = 1024/c, Start key = 1024/a
- // then do not seek for the start key
- //
- // on the other hand,
- // Prefix key = 1024/a, Start key = 1024/c
- // then seek for the start key
- if (!StringUtils.isBlank(startKey) &&
- startKey.compareTo(prefixKey) > 0) {
- tableIterator.seek(startKey);
- }
-
- getNextKey();
- }
-
- private void getNextKey() throws IOException {
- while (tableIterator.hasNext() && currentKey == null) {
- Table.KeyValue<String, Value> entry = tableIterator.next();
- String entryKey = entry.getKey();
- if (entryKey.startsWith(prefixKey)) {
- if (!KeyManagerImpl.isKeyDeleted(entryKey, table)) {
- currentKey = new HeapEntry(iterType, entryKey, entry.getValue());
- }
- } else {
- // if the prefix key does not match, then break
- // as the iterator is beyond the prefix.
- break;
- }
- }
- }
-
- public boolean hasNext() {
- try {
- getNextKey();
- } catch (Throwable t) {
- throw new NoSuchElementException();
- }
- return currentKey != null;
- }
-
- public HeapEntry next() {
- try {
- getNextKey();
- } catch (Throwable t) {
- throw new NoSuchElementException();
- }
- HeapEntry ret = currentKey;
- currentKey = null;
-
- return ret;
- }
-
- public void close() throws IOException {
- tableIterator.close();
- }
- }
-
- /**
- * Iterator for Cache entries in a Dir and File Table.
- */
- private static class CacheIter<Value extends WithParentObjectId>
- implements ClosableIterator {
- private final Map<String, Value> cacheKeyMap;
- private final Iterator<Map.Entry<String, Value>>
- cacheCreatedKeyIter;
- private final Iterator<Map.Entry<CacheKey<String>, CacheValue<Value>>>
- cacheIter;
- private final String prefixKey;
- private final String startKey;
- private final EntryType entryType;
-
- CacheIter(EntryType entryType, Iterator<Map.Entry<CacheKey<String>,
- CacheValue<Value>>> cacheIter, String startKey, String prefixKey) {
- this.cacheKeyMap = new TreeMap<>();
-
- this.cacheIter = cacheIter;
- this.startKey = startKey;
- this.prefixKey = prefixKey;
- this.entryType = entryType;
-
- getCacheValues();
-
- cacheCreatedKeyIter = cacheKeyMap.entrySet().iterator();
- }
-
- private void getCacheValues() {
- while (cacheIter.hasNext()) {
- Map.Entry<CacheKey<String>, CacheValue<Value>> entry =
- cacheIter.next();
- String cacheKey = entry.getKey().getCacheKey();
- Value cacheOmInfo = entry.getValue().getCacheValue();
-
- // Copy cache value to local copy and work on it
- if (cacheOmInfo instanceof CopyObject) {
- cacheOmInfo = ((CopyObject<Value>) cacheOmInfo).copyObject();
- }
- if (StringUtils.isBlank(startKey)) {
- // startKey is null or empty, then the seekKeyInDB="1024/"
- if (cacheKey.startsWith(prefixKey)) {
- cacheKeyMap.put(cacheKey, cacheOmInfo);
- }
- } else {
- // startKey not empty, then the seekKeyInDB="1024/b" and
- // seekKeyInDBWithOnlyParentID = "1024/". This is to avoid case of
- // parentID with "102444" cache entries.
- // Here, it has to list all the keys after "1024/b" and requires >=0
- // string comparison.
- if (cacheKey.startsWith(prefixKey) &&
- cacheKey.compareTo(startKey) >= 0) {
- cacheKeyMap.put(cacheKey, cacheOmInfo);
- }
- }
- }
- }
-
- public boolean hasNext() {
- return cacheCreatedKeyIter.hasNext();
- }
-
- public HeapEntry next() {
- Map.Entry<String, Value> entry = cacheCreatedKeyIter.next();
- return new HeapEntry(entryType, entry.getKey(), entry.getValue());
- }
-
- public void close() {
- // Nothing to close here
- }
- }
-
- /**
- * Implement lexicographical sorting of the file status by sorting file
status
- * across multiple lists. Each of these lists are sorted internally.
- *
- * This class implements sorted output by implementing a min heap based
- * iterator where the initial element from each of sorted list is inserted.
- *
- * The least entry is removed and the next entry from the same list from
- * which the entry is removed is added into the list.
- *
- * For example
- * RawDir - a1, a3, a5, a7
- * RawFile - a2, a4, a6, a8
- *
- * Min Heap is initially composed of {(a1, RawDir), (a2, RawFile)}
- * THe least element is removed i.e a1 and then next entry from RawDir
- * is inserted into minheap resulting in {(a2, RawFile), (a3, RawDir)}
- *
- * This process is repeated till both the lists are exhausted.
- */
- private static class MinHeapIterator implements ClosableIterator {
- private final PriorityQueue<HeapEntry> minHeap = new PriorityQueue<>();
- private final ArrayList<ClosableIterator> iterators = new ArrayList<>();
-
- MinHeapIterator(OMMetadataManager omMetadataManager, String prefixKey,
- BucketLayout bucketLayout, String startKey,
- String volumeName, String bucketName) throws IOException {
-
- omMetadataManager.getLock().acquireReadLock(BUCKET_LOCK, volumeName,
- bucketName);
- try {
- // Initialize all the iterators
- iterators.add(EntryType.DIR_CACHE.ordinal(),
- new CacheIter<>(EntryType.DIR_CACHE,
- omMetadataManager.getDirectoryTable().cacheIterator(),
- startKey, prefixKey));
-
- iterators.add(EntryType.FILE_CACHE.ordinal(),
- new CacheIter<>(EntryType.FILE_CACHE,
- omMetadataManager.getKeyTable(bucketLayout).cacheIterator(),
- startKey, prefixKey));
-
- iterators.add(EntryType.RAW_DIR_DB.ordinal(),
- new RawIter<>(EntryType.RAW_DIR_DB,
- omMetadataManager.getDirectoryTable(),
- prefixKey, startKey));
-
- iterators.add(EntryType.RAW_FILE_DB.ordinal(),
- new RawIter<>(EntryType.RAW_FILE_DB,
- omMetadataManager.getKeyTable(bucketLayout),
- prefixKey, startKey));
- } finally {
- omMetadataManager.getLock().releaseReadLock(BUCKET_LOCK, volumeName,
- bucketName);
- }
-
- // Insert the element from each of the iterator
- for (Iterator<HeapEntry> iter : iterators) {
- if (iter.hasNext()) {
- minHeap.add(iter.next());
- }
- }
- }
-
-
- public boolean hasNext() {
- return !minHeap.isEmpty();
- }
-
- public HeapEntry next() {
- HeapEntry heapEntry = minHeap.remove();
- // remove the least element and
- // reinsert the next element from the same iterator
- Iterator<HeapEntry> iter = iterators.get(heapEntry.entryType.ordinal());
- if (iter.hasNext()) {
- minHeap.add(iter.next());
- }
-
- return heapEntry;
- }
-
- public void close() throws IOException {
- for (ClosableIterator iterator : iterators) {
- iterator.close();
- }
- }
- }
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
index 58dfeb7b83..554d8104f0 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
@@ -54,6 +54,7 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -893,13 +894,18 @@ public class TestOmMetadataManager {
addBucketsToCache(vol1, bucket1);
String prefixA = "snapshotA";
String prefixB = "snapshotB";
- TreeSet<String> snapshotsASet = new TreeSet<>();
+ TreeMap<String, SnapshotInfo> snapshotsASnapshotIDMap = new TreeMap<>();
for (int i = 1; i <= 100; i++) {
if (i % 2 == 0) {
- snapshotsASet.add(prefixA + i);
- OMRequestTestUtils.addSnapshotToTable(vol1, bucket1,
- prefixA + i, omMetadataManager);
+ snapshotsASnapshotIDMap.put(prefixA + i,
+ OMRequestTestUtils.addSnapshotToTable(vol1, bucket1,
+ prefixA + i, omMetadataManager));
+ if (i % 4 == 0) {
+ snapshotsASnapshotIDMap.put(prefixA + i,
+ OMRequestTestUtils.addSnapshotToTableCache(vol1, bucket1,
+ prefixA + i, omMetadataManager));
+ }
} else {
OMRequestTestUtils.addSnapshotToTableCache(vol1, bucket1,
prefixB + i, omMetadataManager);
@@ -921,10 +927,12 @@ public class TestOmMetadataManager {
String startSnapshot = prefixA + 38;
snapshotInfos = omMetadataManager.listSnapshot(vol1,
bucket1, prefixA, startSnapshot, 50);
- assertEquals(snapshotsASet.tailSet(startSnapshot).size() - 1,
+ assertEquals(snapshotsASnapshotIDMap.tailMap(startSnapshot).size() - 1,
snapshotInfos.size());
for (SnapshotInfo snapshotInfo : snapshotInfos) {
assertTrue(snapshotInfo.getName().startsWith(prefixA));
+ assertEquals(snapshotInfo, snapshotsASnapshotIDMap.get(
+ snapshotInfo.getName()));
assertTrue(snapshotInfo.getName().compareTo(startSnapshot) > 0);
}
@@ -937,16 +945,18 @@ public class TestOmMetadataManager {
for (SnapshotInfo snapshotInfo : snapshotInfos) {
expectedSnapshot.add(snapshotInfo.getName());
+ assertEquals(snapshotInfo, snapshotsASnapshotIDMap.get(
+ snapshotInfo.getName()));
assertTrue(snapshotInfo.getName().startsWith(prefixA));
startSnapshot = snapshotInfo.getName();
}
}
- assertEquals(snapshotsASet, expectedSnapshot);
+ assertEquals(snapshotsASnapshotIDMap.keySet(), expectedSnapshot);
// As now we have iterated all 50 snapshots, calling next time should
// return empty list.
snapshotInfos = omMetadataManager.listSnapshot(vol1, bucket1,
- startSnapshot, prefixA, 10);
+ prefixA, startSnapshot, 10);
assertEquals(snapshotInfos.size(), 0);
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
index 45209258f7..ac9dd41409 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
@@ -503,23 +503,25 @@ public final class OMRequestTestUtils {
/**
* Add snapshot entry to DB.
*/
- public static void addSnapshotToTable(
+ public static SnapshotInfo addSnapshotToTable(
String volumeName, String bucketName, String snapshotName,
OMMetadataManager omMetadataManager) throws IOException {
SnapshotInfo snapshotInfo = SnapshotInfo.newInstance(volumeName,
bucketName, snapshotName, UUID.randomUUID(), Time.now());
addSnapshotToTable(false, 0L, snapshotInfo, omMetadataManager);
+ return snapshotInfo;
}
/**
* Add snapshot entry to snapshot table cache.
*/
- public static void addSnapshotToTableCache(
+ public static SnapshotInfo addSnapshotToTableCache(
String volumeName, String bucketName, String snapshotName,
OMMetadataManager omMetadataManager) throws IOException {
SnapshotInfo snapshotInfo = SnapshotInfo.newInstance(volumeName,
bucketName,
snapshotName, UUID.randomUUID(), Time.now());
addSnapshotToTable(true, 0L, snapshotInfo, omMetadataManager);
+ return snapshotInfo;
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]