IGNITE-2810: IGFS: Striped trash directory to reduce contention during removals.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2d14842a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2d14842a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2d14842a Branch: refs/heads/ignite-1786 Commit: 2d14842aba6a53d689d4b4f90a907e2c0f15d291 Parents: c53ffa9 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Mon Mar 14 11:49:03 2016 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Mon Mar 14 11:49:03 2016 +0300 ---------------------------------------------------------------------- .../processors/igfs/IgfsDeleteWorker.java | 28 ++-- .../internal/processors/igfs/IgfsFileInfo.java | 9 +- .../internal/processors/igfs/IgfsImpl.java | 5 +- .../processors/igfs/IgfsMetaManager.java | 146 ++++++++++--------- .../processors/igfs/IgfsOutputStreamImpl.java | 2 +- .../internal/processors/igfs/IgfsUtils.java | 72 +++++++++ .../processors/igfs/IgfsAbstractSelfTest.java | 21 ++- .../igfs/IgfsDataManagerSelfTest.java | 6 +- .../igfs/IgfsMetaManagerSelfTest.java | 2 +- .../processors/igfs/IgfsProcessorSelfTest.java | 9 +- .../processors/igfs/IgfsSizeSelfTest.java | 13 +- 11 files changed, 199 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2d14842a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java index 95a6a5d..ffddd3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java @@ -39,7 +39,6 @@ import org.apache.ignite.lang.IgniteUuid; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_PURGED; import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS; -import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.TRASH_ID; /** * IGFS worker for removal from the trash directory. @@ -150,13 +149,23 @@ public class IgfsDeleteWorker extends IgfsThread { } /** - * Perform cleanup of the trash directory. + * Perform cleanup of trash directories. */ private void delete() { + for (int i = 0; i < IgfsUtils.TRASH_CONCURRENCY; i++) + delete(IgfsUtils.trashId(i)); + } + + /** + * Perform cleanup of concrete trash directory. + * + * @param trashId Trash ID. + */ + private void delete(IgniteUuid trashId) { IgfsFileInfo info = null; try { - info = meta.info(TRASH_ID); + info = meta.info(trashId); } catch(ClusterTopologyServerNotFoundException e) { LT.warn(log, e, "Server nodes not found."); @@ -174,7 +183,7 @@ public class IgfsDeleteWorker extends IgfsThread { try { if (!cancelled) { - if (delete(entry.getKey(), fileId)) { + if (delete(trashId, entry.getKey(), fileId)) { if (log.isDebugEnabled()) log.debug("Sending delete confirmation message [name=" + entry.getKey() + ", fileId=" + fileId + ']'); @@ -200,12 +209,13 @@ public class IgfsDeleteWorker extends IgfsThread { /** * Remove particular entry from the TRASH directory. * + * @param trashId ID of the trash directory. * @param name Entry name. * @param id Entry ID. * @return {@code True} in case the entry really was deleted form the file system by this call. * @throws IgniteCheckedException If failed. */ - private boolean delete(String name, IgniteUuid id) throws IgniteCheckedException { + private boolean delete(IgniteUuid trashId, String name, IgniteUuid id) throws IgniteCheckedException { assert name != null; assert id != null; @@ -214,10 +224,10 @@ public class IgfsDeleteWorker extends IgfsThread { if (info != null) { if (info.isDirectory()) { - if (!deleteDirectoryContents(TRASH_ID, id)) + if (!deleteDirectoryContents(trashId, id)) return false; - if (meta.delete(TRASH_ID, name, id)) + if (meta.delete(trashId, name, id)) return true; } else { @@ -235,7 +245,7 @@ public class IgfsDeleteWorker extends IgfsThread { // In case this node crashes, other node will re-delete the file. data.delete(lockedInfo).get(); - boolean ret = meta.delete(TRASH_ID, name, id); + boolean ret = meta.delete(trashId, name, id); if (info.path() != null) IgfsUtils.sendEvents(igfsCtx.kernalContext(), info.path(), EVT_IGFS_FILE_PURGED); @@ -299,7 +309,7 @@ public class IgfsDeleteWorker extends IgfsThread { // File is already locked: failedFiles++; else { - assert IgfsMetaManager.DELETE_LOCK_ID.equals(lockedInfo.lockId()); + assert IgfsUtils.DELETE_LOCK_ID.equals(lockedInfo.lockId()); fut.add(data.delete(lockedInfo)); http://git-wip-us.apache.org/repos/asf/ignite/blob/2d14842a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileInfo.java index a69920b..0a85657 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileInfo.java @@ -23,7 +23,6 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.Collections; import java.util.Map; -import java.util.UUID; import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.internal.util.GridLeanMap; @@ -41,12 +40,6 @@ public final class IgfsFileInfo implements Externalizable { /** */ private static final long serialVersionUID = 0L; - /** ID for the root directory. */ - public static final IgniteUuid ROOT_ID = new IgniteUuid(new UUID(0, 0), 0); - - /** ID of the trash directory. */ - public static final IgniteUuid TRASH_ID = new IgniteUuid(new UUID(0, 1), 0); - /** Special access time value, indicating that the modification time value should be taken. */ private static final long ACCESS_TIME_TAKE_MODIFICATION_TIME = -1L; @@ -94,7 +87,7 @@ public final class IgfsFileInfo implements Externalizable { * {@link Externalizable} support. */ public IgfsFileInfo() { - this(ROOT_ID); + this(IgfsUtils.ROOT_ID); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/2d14842a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index c1e47a8..0e52927 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -111,7 +111,6 @@ import static org.apache.ignite.igfs.IgfsMode.PRIMARY; import static org.apache.ignite.igfs.IgfsMode.PROXY; import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGFS; -import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.ROOT_ID; /** * Cache-based IGFS implementation. @@ -1206,7 +1205,7 @@ public final class IgfsImpl implements IgfsEx { @Override public IgfsMetrics call() throws Exception { IgfsPathSummary sum = new IgfsPathSummary(); - summary0(ROOT_ID, sum); + summary0(IgfsUtils.ROOT_ID, sum); long secondarySpaceSize = 0; @@ -1279,7 +1278,7 @@ public final class IgfsImpl implements IgfsEx { if (info != null) { if (info.isDirectory()) { - if (!ROOT_ID.equals(info.id())) + if (!IgfsUtils.ROOT_ID.equals(info.id())) sum.directoriesCount(sum.directoriesCount() + 1); for (IgfsListingEntry entry : info.listing().values()) http://git-wip-us.apache.org/repos/asf/ignite/blob/2d14842a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java index c120b9d..89ddd02 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java @@ -37,7 +37,6 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; -import java.util.UUID; import java.util.concurrent.CountDownLatch; import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorException; @@ -91,8 +90,6 @@ import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_RENAMED; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CREATED; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_RENAMED; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE; -import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.ROOT_ID; -import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.TRASH_ID; import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.builder; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; @@ -102,9 +99,6 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA */ @SuppressWarnings("all") public class IgfsMetaManager extends IgfsManager { - /** Lock Id used to lock files being deleted from TRASH. This is a global constant. */ - static final IgniteUuid DELETE_LOCK_ID = new IgniteUuid(new UUID(0L, 0L), 0L); - /** Comparator for Id sorting. */ private static final Comparator<IgniteUuid> PATH_ID_SORTING_COMPARATOR = new Comparator<IgniteUuid>() { @@ -359,9 +353,9 @@ public class IgfsMetaManager extends IgfsManager { // Collection of file IDs for components of specified path. List<IgniteUuid> ids = new ArrayList<>(components.size() + 1); - ids.add(ROOT_ID); // Always add root ID. + ids.add(IgfsUtils.ROOT_ID); // Always add root ID. - IgniteUuid fileId = ROOT_ID; + IgniteUuid fileId = IgfsUtils.ROOT_ID; for (String s : components) { assert !s.isEmpty(); @@ -416,8 +410,8 @@ public class IgfsMetaManager extends IgfsManager { IgfsFileInfo info = id2InfoPrj.get(fileId); // Force root ID always exist in cache. - if (info == null && ROOT_ID.equals(fileId)) - id2InfoPrj.putIfAbsent(ROOT_ID, info = new IgfsFileInfo()); + if (info == null && IgfsUtils.ROOT_ID.equals(fileId)) + id2InfoPrj.putIfAbsent(IgfsUtils.ROOT_ID, info = new IgfsFileInfo()); return info; } @@ -448,14 +442,14 @@ public class IgfsMetaManager extends IgfsManager { Map<IgniteUuid, IgfsFileInfo> map = id2InfoPrj.getAll(fileIds); // Force root ID always exist in cache. - if (fileIds.contains(ROOT_ID) && !map.containsKey(ROOT_ID)) { + if (fileIds.contains(IgfsUtils.ROOT_ID) && !map.containsKey(IgfsUtils.ROOT_ID)) { IgfsFileInfo info = new IgfsFileInfo(); - id2InfoPrj.putIfAbsent(ROOT_ID, info); + id2InfoPrj.putIfAbsent(IgfsUtils.ROOT_ID, info); map = new GridLeanMap<>(map); - map.put(ROOT_ID, info); + map.put(IgfsUtils.ROOT_ID, info); } return map; @@ -545,7 +539,7 @@ public class IgfsMetaManager extends IgfsManager { */ private IgniteUuid composeLockId(boolean isDeleteLock) { if (isDeleteLock) - return DELETE_LOCK_ID; + return IgfsUtils.DELETE_LOCK_ID; return IgniteUuid.fromUuid(locNode.id()); } @@ -675,36 +669,38 @@ public class IgfsMetaManager extends IgfsManager { if (log.isDebugEnabled()) log.debug("Locked file ids: " + fileIds); - // Force root & trash IDs always exist in cache. - addInfoIfNeeded(fileIds, map, ROOT_ID); - addInfoIfNeeded(fileIds, map, TRASH_ID); + for (IgniteUuid fileId : fileIds) { + if (IgfsUtils.isRootOrTrashId(fileId)) { + if (!map.containsKey(fileId)) + map.put(fileId, createSystemEntryIfAbsent(fileId)); + } + } // Returns detail's map for locked IDs. return map; } /** - * Adds FileInfo into the cache if it is requested in fileIds and is not present in the map. + * create system entry if it is absent. * - * @param fileIds A list that may contain the id. - * @param map The map that may not contain the id. - * @param id The id to check. + * @param id System entry ID. + * @return Value of created or existing system entry. * @throws IgniteCheckedException On error. */ - private void addInfoIfNeeded(Collection<IgniteUuid> fileIds, Map<IgniteUuid, IgfsFileInfo> map, IgniteUuid id) + private IgfsFileInfo createSystemEntryIfAbsent(IgniteUuid id) throws IgniteCheckedException { assert validTxState(true); - if (fileIds.contains(id) && !map.containsKey(id)) { - IgfsFileInfo info = new IgfsFileInfo(id); + assert IgfsUtils.isRootOrTrashId(id); - IgfsFileInfo anotherInfo = id2InfoPrj.getAndPutIfAbsent(id, info); + IgfsFileInfo info = new IgfsFileInfo(id); - if (anotherInfo != null) - info = anotherInfo; + IgfsFileInfo oldInfo = id2InfoPrj.getAndPutIfAbsent(id, info); - map.put(id, info); - } + if (oldInfo != null) + info = oldInfo; + + return info; } /** @@ -737,7 +733,7 @@ public class IgfsMetaManager extends IgfsManager { public IgfsFileInfo fileForFragmentizer(Collection<IgniteUuid> exclude) throws IgniteCheckedException { if (busyLock.enterBusy()) { try { - return fileForFragmentizer0(ROOT_ID, exclude); + return fileForFragmentizer0(IgfsUtils.ROOT_ID, exclude); } finally { busyLock.leaveBusy(); @@ -1112,11 +1108,13 @@ public class IgfsMetaManager extends IgfsManager { try { assert validTxState(false); + IgniteUuid trashId = IgfsUtils.randomTrashId(); + final IgniteInternalTx tx = startTx(); try { // NB: We may lock root because its id is less than any other id: - final IgfsFileInfo rootInfo = lockIds(ROOT_ID, TRASH_ID).get(ROOT_ID); + final IgfsFileInfo rootInfo = lockIds(IgfsUtils.ROOT_ID, trashId).get(IgfsUtils.ROOT_ID); assert rootInfo != null; @@ -1135,12 +1133,12 @@ public class IgfsMetaManager extends IgfsManager { id2InfoPrj.put(newInfo.id(), newInfo); // Add new info to trash listing. - id2InfoPrj.invoke(TRASH_ID, new ListingAdd(newInfo.id().toString(), + id2InfoPrj.invoke(trashId, new ListingAdd(newInfo.id().toString(), new IgfsListingEntry(newInfo))); // Remove listing entries from root. // Note that root directory properties and other attributes are preserved: - id2InfoPrj.put(ROOT_ID, new IgfsFileInfo(null/*listing*/, rootInfo)); + id2InfoPrj.put(IgfsUtils.ROOT_ID, new IgfsFileInfo(null/*listing*/, rootInfo)); tx.commit(); @@ -1182,15 +1180,16 @@ public class IgfsMetaManager extends IgfsManager { final IgniteUuid victimId = pathIdList.get(pathIdList.size() - 1); - assert !TRASH_ID.equals(victimId) : "TRASH does not have path, it cannot ever be deletion victim."; - assert !ROOT_ID.equals(victimId); // root deletion is prevented in earlier stages. + assert !IgfsUtils.isRootOrTrashId(victimId) : "Cannot delete root or trash directories."; allIds.addAll(pathIdList); if (allIds.remove(null)) return null; // A fragment of the path no longer exists. - boolean added = allIds.add(TRASH_ID); + IgniteUuid trashId = IgfsUtils.randomTrashId(); + + boolean added = allIds.add(trashId); assert added; final IgniteInternalTx tx = startTx(); @@ -1209,7 +1208,7 @@ public class IgfsMetaManager extends IgfsManager { throw new IgfsDirectoryNotEmptyException("Failed to remove directory (directory is not " + "empty and recursive flag is not set)."); - IgfsFileInfo destInfo = infoMap.get(TRASH_ID); + IgfsFileInfo destInfo = infoMap.get(trashId); assert destInfo != null; @@ -1237,7 +1236,7 @@ public class IgfsMetaManager extends IgfsManager { id2InfoPrj.invoke(srcParentId, new ListingRemove(srcFileName, srcEntry.fileId())); // Add listing entry into the destination parent listing. - id2InfoPrj.invoke(TRASH_ID, new ListingAdd(destFileName, srcEntry)); + id2InfoPrj.invoke(trashId, new ListingAdd(destFileName, srcEntry)); if (victimInfo.isFile()) // Update a file info of the removed file with a file path, @@ -1269,10 +1268,12 @@ public class IgfsMetaManager extends IgfsManager { * @param parentId Parent ID. * @param name Path name. * @param id Path ID. + * @param trashId Trash ID. * @return ID of an entry located directly under the trash directory. * @throws IgniteCheckedException If failed. */ - @Nullable private IgniteUuid softDeleteNonTx(@Nullable IgniteUuid parentId, @Nullable String name, IgniteUuid id) + @Nullable private IgniteUuid softDeleteNonTx(@Nullable IgniteUuid parentId, @Nullable String name, IgniteUuid id, + IgniteUuid trashId) throws IgniteCheckedException { assert validTxState(true); @@ -1280,16 +1281,15 @@ public class IgfsMetaManager extends IgfsManager { if (parentId == null) { // Handle special case when we deleting root directory. - assert ROOT_ID.equals(id); + assert IgfsUtils.ROOT_ID.equals(id); - IgfsFileInfo rootInfo = id2InfoPrj.get(ROOT_ID); + IgfsFileInfo rootInfo = id2InfoPrj.get(IgfsUtils.ROOT_ID); if (rootInfo == null) return null; // Root was never created. // Ensure trash directory existence. - if (id2InfoPrj.get(TRASH_ID) == null) - id2InfoPrj.getAndPut(TRASH_ID, new IgfsFileInfo(TRASH_ID)); + createSystemEntryIfAbsent(trashId); Map<String, IgfsListingEntry> rootListing = rootInfo.listing(); @@ -1314,12 +1314,12 @@ public class IgfsMetaManager extends IgfsManager { id2InfoPrj.getAndPut(newInfo.id(), newInfo); // Add new info to trash listing. - id2InfoPrj.invoke(TRASH_ID, new ListingAdd(newInfo.id().toString(), + id2InfoPrj.invoke(trashId, new ListingAdd(newInfo.id().toString(), new IgfsListingEntry(newInfo))); // Remove listing entries from root. for (Map.Entry<String, IgfsListingEntry> entry : transferListing.entrySet()) - id2InfoPrj.invoke(ROOT_ID, new ListingRemove(entry.getKey(), entry.getValue().fileId())); + id2InfoPrj.invoke(IgfsUtils.ROOT_ID, new ListingRemove(entry.getKey(), entry.getValue().fileId())); resId = newInfo.id(); } @@ -1328,10 +1328,9 @@ public class IgfsMetaManager extends IgfsManager { } else { // Ensure trash directory existence. - if (id2InfoPrj.get(TRASH_ID) == null) - id2InfoPrj.getAndPut(TRASH_ID, new IgfsFileInfo(TRASH_ID)); + createSystemEntryIfAbsent(trashId); - moveNonTx(id, name, parentId, id.toString(), TRASH_ID); + moveNonTx(id, name, parentId, id.toString(), trashId); resId = id; } @@ -1390,7 +1389,7 @@ public class IgfsMetaManager extends IgfsManager { if (entryInfo != null) { // File must be locked for deletion: - assert entryInfo.isDirectory() || DELETE_LOCK_ID.equals(entryInfo.lockId()); + assert entryInfo.isDirectory() || IgfsUtils.DELETE_LOCK_ID.equals(entryInfo.lockId()); // Delete only files or empty folders. if (entryInfo.isFile() || entryInfo.isDirectory() && entryInfo.listing().isEmpty()) { @@ -1457,8 +1456,8 @@ public class IgfsMetaManager extends IgfsManager { if (victim == null) return res; - assert victim.isDirectory() || DELETE_LOCK_ID.equals(victim.lockId()) : - " isDir: " + victim.isDirectory() + ", lockId: " + victim.lockId(); + assert victim.isDirectory() || IgfsUtils.DELETE_LOCK_ID.equals(victim.lockId()) : + " isDir: " + victim.isDirectory() + ", lockId: " + victim.lockId(); // Proceed only in case both parent and child exist. if (infos.containsKey(parentId) && infos.containsKey(id)) { @@ -1504,21 +1503,24 @@ public class IgfsMetaManager extends IgfsManager { public Collection<IgniteUuid> pendingDeletes() throws IgniteCheckedException { if (busyLock.enterBusy()) { try { - IgfsFileInfo trashInfo = id2InfoPrj.get(TRASH_ID); + Collection<IgniteUuid> ids = new HashSet<>(); - if (trashInfo != null) { - Map<String, IgfsListingEntry> listing = trashInfo.listing(); + for (int i = 0; i < IgfsUtils.TRASH_CONCURRENCY; i++) { + IgniteUuid trashId = IgfsUtils.trashId(i); - if (listing != null && !listing.isEmpty()) { - return F.viewReadOnly(listing.values(), new IgniteClosure<IgfsListingEntry, IgniteUuid>() { - @Override public IgniteUuid apply(IgfsListingEntry e) { - return e.fileId(); - } - }); + IgfsFileInfo trashInfo = id2InfoPrj.get(trashId); + + if (trashInfo != null) { + Map<String, IgfsListingEntry> listing = trashInfo.listing(); + + if (listing != null && !listing.isEmpty()) { + for (IgfsListingEntry entry : listing.values()) + ids.add(entry.fileId()); + } } } - return Collections.emptySet(); + return ids; } finally { busyLock.leaveBusy(); @@ -2455,6 +2457,8 @@ public class IgfsMetaManager extends IgfsManager { assert fs != null; assert path != null; + final IgniteUuid trashId = IgfsUtils.randomTrashId(); + SynchronizationTask<Boolean> task = new SynchronizationTask<Boolean>() { @Override public Boolean onSuccess(Map<IgfsPath, IgfsFileInfo> infos) throws Exception { IgfsFileInfo info = infos.get(path); @@ -2468,12 +2472,12 @@ public class IgfsMetaManager extends IgfsManager { if (path.parent() != null) { assert infos.containsKey(path.parent()); - softDeleteNonTx(infos.get(path.parent()).id(), path.name(), info.id()); + softDeleteNonTx(infos.get(path.parent()).id(), path.name(), info.id(), trashId); } else { - assert ROOT_ID.equals(info.id()); + assert IgfsUtils.ROOT_ID.equals(info.id()); - softDeleteNonTx(null, path.name(), info.id()); + softDeleteNonTx(null, path.name(), info.id(), trashId); } // Update the deleted file info with path information for delete worker. @@ -2491,7 +2495,7 @@ public class IgfsMetaManager extends IgfsManager { } }; - Boolean res = synchronizeAndExecute(task, fs, false, Collections.singleton(TRASH_ID), path); + Boolean res = synchronizeAndExecute(task, fs, false, Collections.singleton(trashId), path); delWorker.signal(); @@ -3441,6 +3445,8 @@ public class IgfsMetaManager extends IgfsManager { DirectoryChainBuilder b = null; + IgniteUuid trashId = IgfsUtils.randomTrashId(); + while (true) { if (busyLock.enterBusy()) { try { @@ -3461,11 +3467,11 @@ public class IgfsMetaManager extends IgfsManager { if (overwrite) // Lock also the TRASH directory because in case of overwrite we // may need to delete the old file: - b.idSet.add(TRASH_ID); + b.idSet.add(trashId); final Map<IgniteUuid, IgfsFileInfo> lockedInfos = lockIds(b.idSet); - assert !overwrite || lockedInfos.get(TRASH_ID) != null; // TRASH must exist at this point. + assert !overwrite || lockedInfos.get(trashId) != null; // TRASH must exist at this point. // If the path was changed, we close the current Tx and repeat the procedure again // starting from taking the path ids. @@ -3536,8 +3542,8 @@ public class IgfsMetaManager extends IgfsManager { id2InfoPrj.invoke(parentId, new ListingRemove(name, deletedEntry.fileId())); // Add listing entry into the destination parent listing. - id2InfoPrj.invoke(TRASH_ID, new ListingAdd( - lowermostExistingInfo.id().toString(), deletedEntry)); + id2InfoPrj.invoke(trashId, + new ListingAdd(lowermostExistingInfo.id().toString(), deletedEntry)); // Update a file info of the removed file with a file path, // which will be used by delete worker for event notifications. @@ -3697,7 +3703,7 @@ public class IgfsMetaManager extends IgfsManager { idIdx++; } - assert idSet.contains(ROOT_ID); + assert idSet.contains(IgfsUtils.ROOT_ID); this.lowermostExistingId = lowermostExistingId; http://git-wip-us.apache.org/repos/asf/ignite/blob/2d14842a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java index c9225ae..83056af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java @@ -121,7 +121,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter { if (fileInfo.lockId() == null) throw new IgfsException("Failed to acquire file lock (concurrently modified?): " + path); - assert !IgfsMetaManager.DELETE_LOCK_ID.equals(fileInfo.lockId()); + assert !IgfsUtils.DELETE_LOCK_ID.equals(fileInfo.lockId()); this.igfsCtx = igfsCtx; meta = igfsCtx.meta(); http://git-wip-us.apache.org/repos/asf/ignite/blob/2d14842a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java index 07fdda4..6f8960a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java @@ -18,6 +18,9 @@ package org.apache.ignite.internal.processors.igfs; import java.lang.reflect.Constructor; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteSystemProperties; @@ -36,6 +39,7 @@ import org.apache.ignite.internal.util.lang.IgniteOutClosureX; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.Transaction; import org.jetbrains.annotations.Nullable; @@ -47,10 +51,78 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA * Common IGFS utility methods. */ public class IgfsUtils { + /** ID for the root directory. */ + public static final IgniteUuid ROOT_ID = new IgniteUuid(new UUID(0, 0), 0); + + /** Lock Id used to lock files being deleted from TRASH. This is a global constant. */ + public static final IgniteUuid DELETE_LOCK_ID = new IgniteUuid(new UUID(0, 0), 0); + + /** Constant trash concurrency level. */ + public static final int TRASH_CONCURRENCY = 16; + + /** Trash directory IDs. */ + private static final IgniteUuid[] TRASH_IDS; + /** Maximum number of file unlock transaction retries when topology changes. */ private static final int MAX_CACHE_TX_RETRIES = IgniteSystemProperties.getInteger(IGNITE_CACHE_RETRIES_COUNT, 100); /** + * Static initializer. + */ + static { + TRASH_IDS = new IgniteUuid[TRASH_CONCURRENCY]; + + for (int i = 0; i < TRASH_CONCURRENCY; i++) + TRASH_IDS[i] = new IgniteUuid(new UUID(0, i + 1), 0); + } + + /** + * Get random trash ID. + * + * @return Trash ID. + */ + public static IgniteUuid randomTrashId() { + return TRASH_IDS[ThreadLocalRandom.current().nextInt(TRASH_CONCURRENCY)]; + } + + /** + * Get trash ID for the given index. + * + * @param idx Index. + * @return Trahs ID. + */ + public static IgniteUuid trashId(int idx) { + assert idx >= 0 && idx < TRASH_CONCURRENCY; + + return TRASH_IDS[idx]; + } + + /** + * Check whether provided ID is trash ID. + * + * @param id ID. + * @return {@code True} if this is trash ID. + */ + public static boolean isTrashId(IgniteUuid id) { + assert id != null; + + UUID gid = id.globalId(); + + return id.localId() == 0 && gid.getMostSignificantBits() == 0 && + gid.getLeastSignificantBits() > 0 && gid.getLeastSignificantBits() <= TRASH_CONCURRENCY; + } + + /** + * Check whether provided ID is either root ID or trash ID. + * + * @param id ID. + * @return {@code True} if this is root ID or trash ID. + */ + public static boolean isRootOrTrashId(IgniteUuid id) { + return ROOT_ID.equals(id) || isTrashId(id); + } + + /** * Converts any passed exception to IGFS exception. * * @param err Initial exception. http://git-wip-us.apache.org/repos/asf/ignite/blob/2d14842a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java index 015b992..52d8bd5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java @@ -3138,14 +3138,22 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { igfs.format(); int prevDifferentSize = Integer.MAX_VALUE; // Previous different size. - int size; int constCnt = 0, totalCnt = 0; final int constThreshold = 20; final long sleepPeriod = 500L; final long totalThreshold = CACHE_EMPTY_TIMEOUT / sleepPeriod; while (true) { - size = sumCacheSize(igfs); + int metaSize = 0; + + for (IgniteUuid metaId : getMetaCache(igfs).keySet()) { + if (!IgfsUtils.isRootOrTrashId(metaId)) + metaSize++; + } + + int dataSize = getDataCache(igfs).size(); + + int size = metaSize + dataSize; if (size <= 2) return; // Caches are cleared, we're done. (2 because ROOT & TRASH always exist). @@ -3206,15 +3214,6 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { } /** - * Gets summary IGFS cache size. - * @param igfs The IGFS to measure. - * @return data cache size + meta cache size. - */ - private static int sumCacheSize(IgniteFileSystem igfs) { - return getMetaCache(igfs).size() + getDataCache(igfs).size(); - } - - /** * Clear particular {@link UniversalFileSystemAdapter}. * * @param uni IGFS. http://git-wip-us.apache.org/repos/asf/ignite/blob/2d14842a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDataManagerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDataManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDataManagerSelfTest.java index 8d5ae7a..06ae40b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDataManagerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDataManagerSelfTest.java @@ -168,7 +168,7 @@ public class IgfsDataManagerSelfTest extends IgfsCommonAbstractTest { long t = System.currentTimeMillis(); - IgfsFileInfo info = new IgfsFileInfo(200, 0L, null, IgfsMetaManager.DELETE_LOCK_ID, + IgfsFileInfo info = new IgfsFileInfo(200, 0L, null, IgfsUtils.DELETE_LOCK_ID, false, null, t, t); assertNull(mgr.dataBlock(info, path, 0, null).get()); @@ -253,7 +253,7 @@ public class IgfsDataManagerSelfTest extends IgfsCommonAbstractTest { long t = System.currentTimeMillis(); - IgfsFileInfo info = new IgfsFileInfo(blockSize, 0L, null, IgfsMetaManager.DELETE_LOCK_ID, + IgfsFileInfo info = new IgfsFileInfo(blockSize, 0L, null, IgfsUtils.DELETE_LOCK_ID, false, null, t, t); assertNull(mgr.dataBlock(info, path, 0, null).get()); @@ -345,7 +345,7 @@ public class IgfsDataManagerSelfTest extends IgfsCommonAbstractTest { long t = System.currentTimeMillis(); IgfsFileInfo info = - new IgfsFileInfo(blockSize, 0L, null, IgfsMetaManager.DELETE_LOCK_ID, + new IgfsFileInfo(blockSize, 0L, null, IgfsUtils.DELETE_LOCK_ID, false, null, t, t); IgfsFileAffinityRange range = new IgfsFileAffinityRange(); http://git-wip-us.apache.org/repos/asf/ignite/blob/2d14842a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java index df519ed..f3d35f4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java @@ -42,7 +42,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; -import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.ROOT_ID; +import static org.apache.ignite.internal.processors.igfs.IgfsUtils.ROOT_ID; import static org.apache.ignite.testframework.GridTestUtils.assertThrows; import static org.apache.ignite.testframework.GridTestUtils.assertThrowsInherited; http://git-wip-us.apache.org/repos/asf/ignite/blob/2d14842a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorSelfTest.java index c6853ae..f567099 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorSelfTest.java @@ -778,7 +778,14 @@ public class IgfsProcessorSelfTest extends IgfsCommonAbstractTest { assert !igfs.exists(path(dirPath)); assert !igfs.exists(path(filePath)); - assert grid(0).cachex(igfs.configuration().getMetaCacheName()).size() == 2; // ROOT + TRASH. + int metaSize = 0; + + for (Object metaId : grid(0).cachex(igfs.configuration().getMetaCacheName()).keySet()) { + if (!IgfsUtils.isRootOrTrashId((IgniteUuid)metaId)) + metaSize++; + } + + assert metaSize == 0; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/2d14842a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java index 9c90534..fcf4b3b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java @@ -59,8 +59,6 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; -import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.ROOT_ID; -import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.TRASH_ID; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; @@ -538,26 +536,27 @@ public class IgfsSizeSelfTest extends IgfsCommonAbstractTest { }).start(); // Now add file ID to trash listing so that delete worker could "see" it. + IgniteUuid trashId = IgfsUtils.randomTrashId(); try (Transaction tx = metaCache.unwrap(Ignite.class).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { Map<String, IgfsListingEntry> listing = Collections.singletonMap(path.name(), new IgfsListingEntry(info)); // Clear root listing. - metaCache.put(ROOT_ID, new IgfsFileInfo(ROOT_ID)); + metaCache.put(IgfsUtils.ROOT_ID, new IgfsFileInfo(IgfsUtils.ROOT_ID)); // Add file to trash listing. - IgfsFileInfo trashInfo = metaCache.get(TRASH_ID); + IgfsFileInfo trashInfo = metaCache.get(trashId); if (trashInfo == null) - metaCache.put(TRASH_ID, new IgfsFileInfo(listing, new IgfsFileInfo(TRASH_ID))); + metaCache.put(trashId, new IgfsFileInfo(listing, new IgfsFileInfo(trashId))); else - metaCache.put(TRASH_ID, new IgfsFileInfo(listing, trashInfo)); + metaCache.put(trashId, new IgfsFileInfo(listing, trashInfo)); tx.commit(); } - assert metaCache.get(TRASH_ID) != null; + assert metaCache.get(trashId) != null; // Now the file is locked and is located in trash, try adding some more data. os = igfs.create(otherPath, false);