IGNITE-1590: Reworked create and append operations to match overall design.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/962fcce3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/962fcce3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/962fcce3 Branch: refs/heads/master Commit: 962fcce3acbecd028c4787a6255fedcdcbdf9db1 Parents: 6844370 Author: iveselovskiy <[email protected]> Authored: Wed Oct 14 15:59:57 2015 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Wed Oct 14 15:59:57 2015 +0300 ---------------------------------------------------------------------- .../processors/igfs/IgfsDataManager.java | 2 - .../processors/igfs/IgfsDeleteWorker.java | 102 ++- .../internal/processors/igfs/IgfsImpl.java | 164 +--- .../processors/igfs/IgfsMetaManager.java | 897 ++++++++++++------- .../processors/igfs/IgfsOutputStreamImpl.java | 2 + .../internal/processors/igfs/IgfsUtils.java | 23 + .../ignite/igfs/IgfsEventsAbstractSelfTest.java | 6 +- .../processors/igfs/IgfsAbstractSelfTest.java | 639 ++++++++++--- .../igfs/IgfsDataManagerSelfTest.java | 13 +- .../igfs/IgfsMetaManagerSelfTest.java | 170 ++-- .../processors/igfs/IgfsProcessorSelfTest.java | 12 +- .../ignite/testsuites/IgniteIgfsTestSuite.java | 6 + .../ignite/igfs/Hadoop1DualAbstractTest.java | 5 - .../HadoopIgfs20FileSystemAbstractSelfTest.java | 5 +- .../IgniteHadoopFileSystemAbstractSelfTest.java | 2 +- 15 files changed, 1289 insertions(+), 759 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/962fcce3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java index b1b51f9..125d728 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java @@ -580,8 +580,6 @@ public class IgfsDataManager extends IgfsManager { * @return Delete future that will be completed when file is actually erased. */ public IgniteInternalFuture<Object> delete(IgfsFileInfo fileInfo) { - //assert validTxState(any); // Allow this method call for any transaction state. - if (!fileInfo.isFile()) { if (log.isDebugEnabled()) log.debug("Cannot delete content of not-data file: " + fileInfo); http://git-wip-us.apache.org/repos/asf/ignite/blob/962fcce3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java index 98672e8..95a6a5d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java @@ -27,12 +27,10 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.events.IgfsEvent; import org.apache.ignite.internal.IgniteFutureCancelledCheckedException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.managers.communication.GridIoPolicy; -import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.LT; @@ -62,9 +60,6 @@ public class IgfsDeleteWorker extends IgfsThread { /** Data manager. */ private final IgfsDataManager data; - /** Event manager. */ - private final GridEventStorageManager evts; - /** Logger. */ private final IgniteLogger log; @@ -96,8 +91,6 @@ public class IgfsDeleteWorker extends IgfsThread { meta = igfsCtx.meta(); data = igfsCtx.data(); - evts = igfsCtx.kernalContext().event(); - String igfsName = igfsCtx.igfs().name(); topic = F.isEmpty(igfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(igfsName); @@ -147,6 +140,9 @@ public class IgfsDeleteWorker extends IgfsThread { } } + /** + * Cancels the worker. + */ void cancel() { cancelled = true; @@ -218,7 +214,8 @@ public class IgfsDeleteWorker extends IgfsThread { if (info != null) { if (info.isDirectory()) { - deleteDirectory(TRASH_ID, id); + if (!deleteDirectoryContents(TRASH_ID, id)) + return false; if (meta.delete(TRASH_ID, name, id)) return true; @@ -226,19 +223,22 @@ public class IgfsDeleteWorker extends IgfsThread { else { assert info.isFile(); + // Lock the file with special lock Id to prevent concurrent writing: + IgfsFileInfo lockedInfo = meta.lock(id, true); + + if (lockedInfo == null) + return false; // File is locked, we cannot delete it. + + assert id.equals(lockedInfo.id()); + // Delete file content first. // In case this node crashes, other node will re-delete the file. - data.delete(info).get(); + data.delete(lockedInfo).get(); boolean ret = meta.delete(TRASH_ID, name, id); - if (evts.isRecordable(EVT_IGFS_FILE_PURGED)) { - if (info.path() != null) - evts.record(new IgfsEvent(info.path(), - igfsCtx.kernalContext().discovery().localNode(), EVT_IGFS_FILE_PURGED)); - else - LT.warn(log, null, "Removing file without path info: " + info); - } + if (info.path() != null) + IgfsUtils.sendEvents(igfsCtx.kernalContext(), info.path(), EVT_IGFS_FILE_PURGED); return ret; } @@ -253,9 +253,10 @@ public class IgfsDeleteWorker extends IgfsThread { * * @param parentId Parent ID. * @param id Entry id. + * @return true iff all the items in the directory were deleted (directory is seen to be empty). * @throws IgniteCheckedException If delete failed for some reason. */ - private void deleteDirectory(IgniteUuid parentId, IgniteUuid id) throws IgniteCheckedException { + private boolean deleteDirectoryContents(IgniteUuid parentId, final IgniteUuid id) throws IgniteCheckedException { assert parentId != null; assert id != null; @@ -265,47 +266,50 @@ public class IgfsDeleteWorker extends IgfsThread { if (info != null) { assert info.isDirectory(); - Map<String, IgfsListingEntry> listing = info.listing(); + final Map<String, IgfsListingEntry> listing = info.listing(); if (listing.isEmpty()) - return; // Directory is empty. + return true; // Directory is empty. - Map<String, IgfsListingEntry> delListing; + final Map<String, IgfsListingEntry> delListing = new HashMap<>(MAX_DELETE_BATCH, 1.0f); - if (listing.size() <= MAX_DELETE_BATCH) - delListing = listing; - else { - delListing = new HashMap<>(MAX_DELETE_BATCH, 1.0f); + final GridCompoundFuture<Object, ?> fut = new GridCompoundFuture<>(); - int i = 0; + int failedFiles = 0; - for (Map.Entry<String, IgfsListingEntry> entry : listing.entrySet()) { - delListing.put(entry.getKey(), entry.getValue()); + for (final Map.Entry<String, IgfsListingEntry> entry : listing.entrySet()) { + if (cancelled) + return false; - if (++i == MAX_DELETE_BATCH) - break; + if (entry.getValue().isDirectory()) { + if (deleteDirectoryContents(id, entry.getValue().fileId())) // *** Recursive call. + delListing.put(entry.getKey(), entry.getValue()); + else + failedFiles++; } - } + else { + IgfsFileInfo fileInfo = meta.info(entry.getValue().fileId()); - GridCompoundFuture<Object, ?> fut = new GridCompoundFuture<>(); + if (fileInfo != null) { + assert fileInfo.isFile(); - // Delegate to child folders. - for (IgfsListingEntry entry : delListing.values()) { - if (!cancelled) { - if (entry.isDirectory()) - deleteDirectory(id, entry.fileId()); - else { - IgfsFileInfo fileInfo = meta.info(entry.fileId()); + IgfsFileInfo lockedInfo = meta.lock(fileInfo.id(), true); + + if (lockedInfo == null) + // File is already locked: + failedFiles++; + else { + assert IgfsMetaManager.DELETE_LOCK_ID.equals(lockedInfo.lockId()); - if (fileInfo != null) { - assert fileInfo.isFile(); + fut.add(data.delete(lockedInfo)); - fut.add(data.delete(fileInfo)); + delListing.put(entry.getKey(), entry.getValue()); } } } - else - return; + + if (delListing.size() == MAX_DELETE_BATCH) + break; } fut.markInitialized(); @@ -318,17 +322,21 @@ public class IgfsDeleteWorker extends IgfsThread { // This future can be cancelled only due to IGFS shutdown. cancelled = true; - return; + return false; } // Actual delete of folder content. Collection<IgniteUuid> delIds = meta.delete(id, delListing); - if (delListing == listing && delListing.size() == delIds.size()) - break; // All entries were deleted. + if (listing.size() == delIds.size()) + return true; // All entries were deleted. + + if (listing.size() == delListing.size() + failedFiles) + // All the files were tried, no reason to continue the loop: + return false; } else - break; // Entry was deleted concurrently. + return true; // Directory entry was deleted concurrently. } } http://git-wip-us.apache.org/repos/asf/ignite/blob/962fcce3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index fa3a955..0d5cda3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -60,7 +60,6 @@ import org.apache.ignite.igfs.IgfsMetrics; import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.IgfsOutputStream; import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.igfs.IgfsPathAlreadyExistsException; import org.apache.ignite.igfs.IgfsPathIsDirectoryException; import org.apache.ignite.igfs.IgfsPathNotFoundException; import org.apache.ignite.igfs.IgfsPathSummary; @@ -97,7 +96,6 @@ import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED; import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_RENAMED; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_READ; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_WRITE; -import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CREATED; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE; @@ -112,7 +110,6 @@ import static org.apache.ignite.igfs.IgfsMode.PROXY; import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGFS; import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.ROOT_ID; -import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.TRASH_ID; /** * Cache-based IGFS implementation. @@ -122,7 +119,7 @@ public final class IgfsImpl implements IgfsEx { private static final String PERMISSION_DFLT_VAL = "0777"; /** Default directory metadata. */ - private static final Map<String, String> DFLT_DIR_META = F.asMap(PROP_PERMISSION, PERMISSION_DFLT_VAL); + static final Map<String, String> DFLT_DIR_META = F.asMap(PROP_PERMISSION, PERMISSION_DFLT_VAL); /** Handshake message. */ private final IgfsPaths secondaryPaths; @@ -740,14 +737,9 @@ public final class IgfsImpl implements IgfsEx { } // Record event if needed. - if (res && desc != null) { - if (desc.isFile) { - if (evts.isRecordable(EVT_IGFS_FILE_DELETED)) - evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_FILE_DELETED)); - } - else if (evts.isRecordable(EVT_IGFS_DIR_DELETED)) - evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_DIR_DELETED)); - } + if (res && desc != null) + IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, + desc.isFile ? EVT_IGFS_FILE_DELETED : EVT_IGFS_DIR_DELETED); return res; } @@ -928,8 +920,7 @@ public final class IgfsImpl implements IgfsEx { IgfsEventAwareInputStream os = new IgfsEventAwareInputStream(igfsCtx, path, desc.info(), cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, desc.reader(), metrics); - if (evts.isRecordable(EVT_IGFS_FILE_OPENED_READ)) - evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_FILE_OPENED_READ)); + IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ); return os; } @@ -949,8 +940,7 @@ public final class IgfsImpl implements IgfsEx { IgfsEventAwareInputStream os = new IgfsEventAwareInputStream(igfsCtx, path, info, cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, null, metrics); - if (evts.isRecordable(EVT_IGFS_FILE_OPENED_READ)) - evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_FILE_OPENED_READ)); + IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ); return os; } @@ -1004,7 +994,7 @@ public final class IgfsImpl implements IgfsEx { log.debug("Open file for writing [path=" + path + ", bufSize=" + bufSize + ", overwrite=" + overwrite + ", props=" + props + ']'); - IgfsMode mode = resolveMode(path); + final IgfsMode mode = resolveMode(path); IgfsFileWorkerBatch batch; @@ -1021,71 +1011,28 @@ public final class IgfsImpl implements IgfsEx { IgfsEventAwareOutputStream os = new IgfsEventAwareOutputStream(path, desc.info(), desc.parentId(), bufSize == 0 ? cfg.getStreamBufferSize() : bufSize, mode, batch); - if (evts.isRecordable(EVT_IGFS_FILE_OPENED_WRITE)) - evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_FILE_OPENED_WRITE)); + IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_WRITE); return os; } - // Re-create parents when working in PRIMARY mode. In DUAL mode this is done by MetaManager. - IgfsPath parent = path.parent(); - - // Create missing parent directories if necessary. - if (parent != null) - mkdirs(parent, props); - - List<IgniteUuid> ids = meta.fileIds(path); - - // Resolve parent ID for file. - IgniteUuid parentId = ids.size() >= 2 ? ids.get(ids.size() - 2) : null; - - if (parentId == null) - throw new IgfsPathNotFoundException("Failed to resolve parent directory: " + parent); - - String fileName = path.name(); - - // Constructs new file info. - IgfsFileInfo info = new IgfsFileInfo(cfg.getBlockSize(), affKey, evictExclude(path, true), props); - - // Add new file into tree structure. - while (true) { - IgniteUuid oldId = meta.putIfAbsent(parentId, fileName, info); - - if (oldId == null) - break; - - if (!overwrite) - throw new IgfsPathAlreadyExistsException("Failed to create file (file already exists): " + - path); - - IgfsFileInfo oldInfo = meta.info(oldId); - - assert oldInfo != null; - - if (oldInfo.isDirectory()) - throw new IgfsPathAlreadyExistsException("Failed to create file (path points to a " + - "directory): " + path); + final Map<String, String> dirProps, fileProps; - // Remove old file from the tree. - // Only one file is deleted, so we use internal data streamer. - deleteFile(path, new FileDescriptor(parentId, fileName, oldId, oldInfo.isFile()), false); + if (props == null) { + dirProps = DFLT_DIR_META; - if (evts.isRecordable(EVT_IGFS_FILE_DELETED)) - evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_FILE_DELETED)); + fileProps = null; } + else + dirProps = fileProps = new HashMap<>(props); - if (evts.isRecordable(EVT_IGFS_FILE_CREATED)) - evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_FILE_CREATED)); + IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = meta.create(path, false/*append*/, overwrite, dirProps, + cfg.getBlockSize(), affKey, evictExclude(path, true), fileProps); - info = meta.lock(info.id()); + assert t2 != null; - IgfsEventAwareOutputStream os = new IgfsEventAwareOutputStream(path, info, parentId, + return new IgfsEventAwareOutputStream(path, t2.get1(), t2.get2(), bufSize == 0 ? cfg.getStreamBufferSize() : bufSize, mode, null); - - if (evts.isRecordable(EVT_IGFS_FILE_OPENED_WRITE)) - evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_FILE_OPENED_WRITE)); - - return os; } }); } @@ -1107,7 +1054,7 @@ public final class IgfsImpl implements IgfsEx { log.debug("Open file for appending [path=" + path + ", bufSize=" + bufSize + ", create=" + create + ", props=" + props + ']'); - IgfsMode mode = resolveMode(path); + final IgfsMode mode = resolveMode(path); IgfsFileWorkerBatch batch; @@ -1124,46 +1071,39 @@ public final class IgfsImpl implements IgfsEx { bufSize == 0 ? cfg.getStreamBufferSize() : bufSize, mode, batch); } - List<IgniteUuid> ids = meta.fileIds(path); + final List<IgniteUuid> ids = meta.fileIds(path); - IgfsFileInfo info = meta.info(ids.get(ids.size() - 1)); + final IgniteUuid id = ids.get(ids.size() - 1); - // Resolve parent ID for the file. - IgniteUuid parentId = ids.size() >= 2 ? ids.get(ids.size() - 2) : null; - - if (info == null) { + if (id == null) { if (!create) { checkConflictWithPrimary(path); throw new IgfsPathNotFoundException("File not found: " + path); } + } - if (parentId == null) - throw new IgfsPathNotFoundException("Failed to resolve parent directory: " + path.parent()); - - info = new IgfsFileInfo(cfg.getBlockSize(), /**affinity key*/null, evictExclude(path, true), props); + // Prevent attempt to append to ROOT in early stage: + if (ids.size() == 1) + throw new IgfsPathIsDirectoryException("Failed to open file (not a file): " + path); - IgniteUuid oldId = meta.putIfAbsent(parentId, path.name(), info); + final Map<String, String> dirProps, fileProps; - if (oldId != null) - info = meta.info(oldId); + if (props == null) { + dirProps = DFLT_DIR_META; - if (evts.isRecordable(EVT_IGFS_FILE_CREATED)) - evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_FILE_CREATED)); + fileProps = null; } + else + dirProps = fileProps = new HashMap<>(props); - assert info != null; - - if (!info.isFile()) - throw new IgfsPathIsDirectoryException("Failed to open file (not a file): " + path); - - info = meta.lock(info.id()); + IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = meta.create(path, true/*append*/, false/*overwrite*/, + dirProps, cfg.getBlockSize(), null/*affKey*/, evictExclude(path, true), fileProps); - if (evts.isRecordable(EVT_IGFS_FILE_OPENED_WRITE)) - evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_FILE_OPENED_WRITE)); + assert t2 != null; - return new IgfsEventAwareOutputStream(path, info, parentId, bufSize == 0 ? - cfg.getStreamBufferSize() : bufSize, mode, null); + return new IgfsEventAwareOutputStream(path, t2.get1(), t2.get2(), + bufSize == 0 ? cfg.getStreamBufferSize() : bufSize, mode, null); } }); } @@ -1451,30 +1391,6 @@ public final class IgfsImpl implements IgfsEx { } /** - * Remove file from the file system (structure and data). - * - * @param path Path of the deleted file. - * @param desc Detailed file descriptor to remove. - * @param rmvLocked Whether to remove this entry in case it is has explicit lock. - * @throws IgniteCheckedException If failed. - */ - private void deleteFile(IgfsPath path, FileDescriptor desc, boolean rmvLocked) throws IgniteCheckedException { - IgniteUuid parentId = desc.parentId; - IgniteUuid fileId = desc.fileId; - - if (parentId == null || ROOT_ID.equals(fileId)) { - assert parentId == null && ROOT_ID.equals(fileId) : "Invalid file descriptor: " + desc; - - return; // Never remove the root directory! - } - - if (TRASH_ID.equals(fileId)) - return; // Never remove trash directory. - - meta.removeIfEmpty(parentId, desc.fileName, fileId, path, rmvLocked); - } - - /** * Check whether IGFS with the same name exists among provided attributes. * * @param attrs Attributes. @@ -2005,13 +1921,13 @@ public final class IgfsImpl implements IgfsEx { /** * Perform IGFS operation in safe context. * - * @param action Action. + * @param act Action. * @return Result. */ - private <T> T safeOp(Callable<T> action) { + private <T> T safeOp(Callable<T> act) { if (enterBusy()) { try { - return action.call(); + return act.call(); } catch (Exception e) { throw IgfsUtils.toIgfsException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/962fcce3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java index 927067a..c016e46 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java @@ -37,8 +37,11 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.EntryProcessorResult; import javax.cache.processor.MutableEntry; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -46,6 +49,7 @@ import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.events.EventType; import org.apache.ignite.events.IgfsEvent; import org.apache.ignite.igfs.IgfsConcurrentModificationException; import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException; @@ -73,9 +77,10 @@ import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.IgniteOutClosureX; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.LT; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -83,9 +88,8 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_CREATED; import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_RENAMED; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CREATED; -import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED; -import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_PURGED; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_RENAMED; +import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE; import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.ROOT_ID; import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.TRASH_ID; import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.builder; @@ -97,6 +101,9 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA */ @SuppressWarnings("all") public class IgfsMetaManager extends IgfsManager { + /** Lock Id used to lock files being deleted from TRASH. This is a global constant. */ + static final IgniteUuid DELETE_LOCK_ID = new IgniteUuid(new UUID(0L, 0L), 0L); + /** Comparator for Id sorting. */ private static final Comparator<IgniteUuid> PATH_ID_SORTING_COMPARATOR = new Comparator<IgniteUuid>() { @@ -295,7 +302,8 @@ public class IgfsMetaManager extends IgfsManager { * @return File ID. * @throws IgniteCheckedException If failed. */ - @Nullable private IgniteUuid fileId(IgniteUuid parentId, String fileName, boolean skipTx) throws IgniteCheckedException { + @Nullable private IgniteUuid fileId(IgniteUuid parentId, String fileName, boolean skipTx) + throws IgniteCheckedException { IgfsListingEntry entry = directoryListing(parentId, skipTx).get(fileName); if (entry == null) { @@ -464,9 +472,9 @@ public class IgfsMetaManager extends IgfsManager { * * @param fileId File ID to lock. * @return Locked file info or {@code null} if file cannot be locked or doesn't exist. - * @throws IgniteCheckedException If failed. + * @throws IgniteCheckedException If the file with such id does not exist, or on another failure. */ - public IgfsFileInfo lock(IgniteUuid fileId) throws IgniteCheckedException { + public @Nullable IgfsFileInfo lock(IgniteUuid fileId, boolean isDeleteLock) throws IgniteCheckedException { if (busyLock.enterBusy()) { try { assert validTxState(false); @@ -479,14 +487,19 @@ public class IgfsMetaManager extends IgfsManager { IgfsFileInfo oldInfo = info(fileId); if (oldInfo == null) - throw new IgniteCheckedException("Failed to lock file (file not found): " + fileId); + return null; - IgfsFileInfo newInfo = lockInfo(oldInfo); + if (oldInfo.lockId() != null) + return null; // The file is already locked, we cannot lock it. - boolean put = metaCache.put(fileId, newInfo); + IgfsFileInfo newInfo = lockInfo(oldInfo, isDeleteLock); + + boolean put = metaCache.replace(fileId, oldInfo, newInfo); assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo + ']'; + assert newInfo.id().equals(oldInfo.id()); // Same id. + tx.commit(); return newInfo; @@ -510,26 +523,26 @@ public class IgfsMetaManager extends IgfsManager { * Set lock on file info. * * @param info File info. - * @return New file info with lock set. + * @return New file info with lock set, or null if the info passed in is already locked. * @throws IgniteCheckedException In case lock is already set on that file. */ - public IgfsFileInfo lockInfo(IgfsFileInfo info) throws IgniteCheckedException { - if (busyLock.enterBusy()) { - try { - assert info != null; + private static @Nullable IgfsFileInfo lockInfo(IgfsFileInfo info, boolean isDeleteLock) { + assert info != null; - if (info.lockId() != null) - throw new IgniteCheckedException("Failed to lock file (file is being concurrently written) [fileId=" + - info.id() + ", lockId=" + info.lockId() + ']'); + if (info.lockId() != null) + return null; // Null return value indicates that the file is already locked. - return new IgfsFileInfo(info, IgniteUuid.randomUuid(), info.modificationTime()); - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to get lock info because Grid is stopping: " + info); + return new IgfsFileInfo(info, composeLockId(isDeleteLock), info.modificationTime()); + } + + /** + * Gets a new lock id. + * + * @param isDeleteLock if this is special delete lock. + * @return The new lock id. + */ + private static IgniteUuid composeLockId(boolean isDeleteLock) { + return isDeleteLock ? DELETE_LOCK_ID : IgniteUuid.randomUuid(); } /** @@ -556,23 +569,28 @@ public class IgfsMetaManager extends IgfsManager { try { IgfsUtils.doInTransactionWithRetries(metaCache, new IgniteOutClosureX<Void>() { @Override public Void applyx() throws IgniteCheckedException { + assert validTxState(true); + IgniteUuid fileId = info.id(); // Lock file ID for this transaction. IgfsFileInfo oldInfo = info(fileId); if (oldInfo == null) - throw fsException(new IgfsPathNotFoundException("Failed to unlock file (file not found): " + fileId)); + throw fsException(new IgfsPathNotFoundException("Failed to unlock file (file not " + + "found): " + fileId)); if (!info.lockId().equals(oldInfo.lockId())) - throw new IgniteCheckedException("Failed to unlock file (inconsistent file lock ID) [fileId=" + fileId + - ", lockId=" + info.lockId() + ", actualLockId=" + oldInfo.lockId() + ']'); + throw new IgniteCheckedException("Failed to unlock file (inconsistent file lock ID) " + + "[fileId=" + fileId + ", lockId=" + info.lockId() + ", actualLockId=" + + oldInfo.lockId() + ']'); IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, null, modificationTime); boolean put = metaCache.put(fileId, newInfo); - assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo + ']'; + assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo + + ']'; return null; } @@ -668,7 +686,8 @@ public class IgfsMetaManager extends IgfsManager { * @param id The id to check. * @throws IgniteCheckedException On error. */ - private void addInfoIfNeeded(Collection<IgniteUuid> fileIds, Map<IgniteUuid, IgfsFileInfo> map, IgniteUuid id) throws IgniteCheckedException { + private void addInfoIfNeeded(Collection<IgniteUuid> fileIds, Map<IgniteUuid, IgfsFileInfo> map, IgniteUuid id) + throws IgniteCheckedException { assert validTxState(true); if (fileIds.contains(id) && !map.containsKey(id)) { @@ -773,7 +792,8 @@ public class IgfsMetaManager extends IgfsManager { * @return Directory listing for the specified file.* * @throws IgniteCheckedException If failed. */ - private Map<String, IgfsListingEntry> directoryListing(IgniteUuid fileId, boolean skipTx) throws IgniteCheckedException { + private Map<String, IgfsListingEntry> directoryListing(IgniteUuid fileId, boolean skipTx) + throws IgniteCheckedException { assert fileId != null; IgfsFileInfo info = skipTx ? id2InfoPrj.getAllOutTx(Collections.singleton(fileId)).get(fileId) : @@ -783,48 +803,6 @@ public class IgfsMetaManager extends IgfsManager { } /** - * Add file into file system structure. - * - * @param parentId Parent file ID. - * @param fileName File name in the parent's listing. - * @param newFileInfo File info to store in the parent's listing. - * @return File id already stored in meta cache or {@code null} if passed file info was stored. - * @throws IgniteCheckedException If failed. - */ - public IgniteUuid putIfAbsent(IgniteUuid parentId, String fileName, IgfsFileInfo newFileInfo) - throws IgniteCheckedException { - if (busyLock.enterBusy()) { - try { - assert validTxState(false); - assert parentId != null; - assert fileName != null; - assert newFileInfo != null; - - IgniteUuid res = null; - - IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ); - - try { - res = putIfAbsentNonTx(parentId, fileName, newFileInfo); - - tx.commit(); - } - finally { - tx.close(); - } - - return res; - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to put file because Grid is stopping [parentId=" + parentId + - ", fileName=" + fileName + ", newFileInfo=" + newFileInfo + ']'); - } - - /** * Add file into file system structure. Do not create new transaction expecting that the one already exists. * * @param parentId Parent file ID. @@ -845,7 +823,8 @@ public class IgfsMetaManager extends IgfsManager { IgfsFileInfo parentInfo = info(parentId); if (parentInfo == null) - throw fsException(new IgfsPathNotFoundException("Failed to lock parent directory (not found): " + parentId)); + throw fsException(new IgfsPathNotFoundException("Failed to lock parent directory (not found): " + + parentId)); if (!parentInfo.isDirectory()) throw fsException(new IgfsPathIsNotDirectoryException("Parent file is not a directory: " + parentInfo)); @@ -1122,124 +1101,6 @@ public class IgfsMetaManager extends IgfsManager { } /** - * Remove file from the file system structure. - * - * @param parentId Parent file ID. - * @param fileName New file name in the parent's listing. - * @param fileId File ID to remove. - * @param path Path of the deleted file. - * @param rmvLocked Whether to remove this entry in case it is has explicit lock. - * @return The last actual file info or {@code null} if such file no more exist. - * @throws IgniteCheckedException If failed. - */ - @Nullable public IgfsFileInfo removeIfEmpty(IgniteUuid parentId, String fileName, IgniteUuid fileId, - IgfsPath path, boolean rmvLocked) - throws IgniteCheckedException { - if (busyLock.enterBusy()) { - try { - assert validTxState(false); - - IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ); - - try { - if (parentId != null) - lockIds(parentId, fileId, TRASH_ID); - else - lockIds(fileId, TRASH_ID); - - IgfsFileInfo fileInfo = removeIfEmptyNonTx(parentId, fileName, fileId, path, rmvLocked); - - tx.commit(); - - delWorker.signal(); - - return fileInfo; - } - finally { - tx.close(); - } - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to remove file system entry because Grid is stopping [parentId=" + - parentId + ", fileName=" + fileName + ", fileId=" + fileId + ", path=" + path + ']'); - } - - /** - * Remove file from the file system structure in existing transaction. - * - * @param parentId Parent file ID. - * @param fileName New file name in the parent's listing. - * @param fileId File ID to remove. - * @param path Path of the deleted file. - * @param rmvLocked Whether to remove this entry in case it has explicit lock. - * @return The last actual file info or {@code null} if such file no more exist. - * @throws IgniteCheckedException If failed. - */ - @Nullable private IgfsFileInfo removeIfEmptyNonTx(@Nullable IgniteUuid parentId, String fileName, IgniteUuid fileId, - IgfsPath path, boolean rmvLocked) - throws IgniteCheckedException { - assert validTxState(true); - assert parentId != null; - assert fileName != null; - assert fileId != null; - assert !ROOT_ID.equals(fileId); - - if (log.isDebugEnabled()) - log.debug("Remove file: [parentId=" + parentId + ", fileName= " + fileName + ", fileId=" + fileId + ']'); - - // Safe gets because locks are obtained in removeIfEmpty. - IgfsFileInfo fileInfo = id2InfoPrj.get(fileId); - IgfsFileInfo parentInfo = id2InfoPrj.get(parentId); - - if (fileInfo == null || parentInfo == null) { - if (parentInfo != null) { // fileInfo == null - IgfsListingEntry entry = parentInfo.listing().get(fileName); - - // If file info does not exists but listing entry exists, throw inconsistent exception. - if (entry != null && entry.fileId().equals(fileId)) - throw new IgniteCheckedException("Failed to remove file (file system is in inconsistent state) " + - "[fileInfo=" + fileInfo + ", fileName=" + fileName + ", fileId=" + fileId + ']'); - } - - return null; // Parent directory or removed file cannot be locked (not found?). - } - - assert parentInfo.isDirectory(); - - if (!rmvLocked && fileInfo.lockId() != null) - throw fsException("Failed to remove file (file is opened for writing) [fileName=" + - fileName + ", fileId=" + fileId + ", lockId=" + fileInfo.lockId() + ']'); - - // Validate own directory listing. - if (fileInfo.isDirectory()) { - Map<String, IgfsListingEntry> listing = fileInfo.listing(); - - if (!F.isEmpty(listing)) - throw fsException(new IgfsDirectoryNotEmptyException("Failed to remove file (directory is not empty)" + - " [fileId=" + fileId + ", listing=" + listing + ']')); - } - - // Validate file in the parent listing. - IgfsListingEntry listingEntry = parentInfo.listing().get(fileName); - - if (listingEntry == null || !listingEntry.fileId().equals(fileId)) - return null; - - // Actual remove. - softDeleteNonTx(parentId, fileName, fileId); - - // Update a file info of the removed file with a file path, - // which will be used by delete worker for event notifications. - id2InfoPrj.invoke(fileId, new UpdatePath(path)); - - return builder(fileInfo).path(path).build(); - } - - /** * Deletes (moves to TRASH) all elements under the root folder. * * @return The new Id if the artificially created folder containing all former root @@ -1528,6 +1389,9 @@ public class IgfsMetaManager extends IgfsManager { IgfsFileInfo entryInfo = locks.get(entryId); if (entryInfo != null) { + // File must be locked for deletion: + assert entryInfo.isDirectory() || DELETE_LOCK_ID.equals(entryInfo.lockId()); + // Delete only files or empty folders. if (entryInfo.isFile() || entryInfo.isDirectory() && entryInfo.listing().isEmpty()) { id2InfoPrj.getAndRemove(entryId); @@ -1588,6 +1452,14 @@ public class IgfsMetaManager extends IgfsManager { Map<IgniteUuid, IgfsFileInfo> infos = lockIds(parentId, id); + IgfsFileInfo victim = infos.get(id); + + if (victim == null) + return res; + + assert victim.isDirectory() || DELETE_LOCK_ID.equals(victim.lockId()) : + " isDir: " + victim.isDirectory() + ", lockId: " + victim.lockId(); + // Proceed only in case both parent and child exist. if (infos.containsKey(parentId) && infos.containsKey(id)) { IgfsFileInfo parentInfo = infos.get(parentId); @@ -1599,7 +1471,9 @@ public class IgfsMetaManager extends IgfsManager { if (listingEntry != null) id2InfoPrj.invoke(parentId, new UpdateListing(name, listingEntry, true)); - id2InfoPrj.getAndRemove(id); + IgfsFileInfo deleted = id2InfoPrj.getAndRemove(id); + + assert victim.id().equals(deleted.id()); res = true; } @@ -1885,66 +1759,34 @@ public class IgfsMetaManager extends IgfsManager { assert props != null; assert validTxState(false); - List<String> components; - SortedSet<IgniteUuid> idSet; - IgfsPath existingPath; + DirectoryChainBuilder b = null; while (true) { if (busyLock.enterBusy()) { try { - // Take the ids in *path* order out of transaction: - final List<IgniteUuid> idList = fileIds(path); - - idSet = new TreeSet<IgniteUuid>(PATH_ID_SORTING_COMPARATOR); - - idSet.add(ROOT_ID); - - components = path.components(); - - // Store all the non-null ids in the set & construct existing path in one loop: - existingPath = path.root(); - - assert idList.size() == components.size() + 1; - - // Find the lowermost existing id: - IgniteUuid parentId = ROOT_ID; - - for (int i = 1; i < idList.size(); i++) { - IgniteUuid id = idList.get(i); - - if (id == null) - break; - - parentId = id; - - boolean added = idSet.add(id); - - assert added; - - existingPath = new IgfsPath(existingPath, components.get(i - 1)); - } + b = new DirectoryChainBuilder(path, props, props); // Start TX. IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ); try { - final Map<IgniteUuid, IgfsFileInfo> lockedInfos = lockIds(idSet); + final Map<IgniteUuid, IgfsFileInfo> lockedInfos = lockIds(b.idSet); // If the path was changed, we close the current Tx and repeat the procedure again // starting from taking the path ids. - if (verifyPathIntegrity(existingPath, idList, lockedInfos)) { + if (verifyPathIntegrity(b.existingPath, b.idList, lockedInfos)) { // Locked path okay, trying to proceed with the remainder creation. - IgfsFileInfo parentInfo = lockedInfos.get(parentId); + IgfsFileInfo lowermostExistingInfo = lockedInfos.get(b.lowermostExistingId); // Check only the lowermost directory in the existing directory chain // because others are already checked in #verifyPathIntegrity() above. - if (!parentInfo.isDirectory()) + if (!lowermostExistingInfo.isDirectory()) throw new IgfsParentNotDirectoryException("Failed to create directory (parent " + "element is not a directory)"); - if (idSet.size() == components.size() + 1) { - assert existingPath.equals(path); - assert lockedInfos.size() == idSet.size(); + if (b.existingIdCnt == b.components.size() + 1) { + assert b.existingPath.equals(path); + assert lockedInfos.size() == b.existingIdCnt; // The target directory already exists, nothing to do. // (The fact that all the path consisns of directories is already checked above). @@ -1952,48 +1794,15 @@ public class IgfsMetaManager extends IgfsManager { return false; } - Map<String, IgfsListingEntry> parentListing = parentInfo.listing(); + Map<String, IgfsListingEntry> parentListing = lowermostExistingInfo.listing(); - String shortName = components.get(idSet.size() - 1); + String shortName = b.components.get(b.existingIdCnt - 1); IgfsListingEntry entry = parentListing.get(shortName); if (entry == null) { - IgfsFileInfo childInfo = null; - - String childName = null; - - IgfsFileInfo newDirInfo; - - // This loop creates the missing directory chain from the bottom to the top: - for (int i = components.size() - 1; i >= idSet.size() - 1; i--) { - // Required entry does not exist. - // Create new directory info: - if (childName == null) { - assert childInfo == null; - - newDirInfo = new IgfsFileInfo(true, props); - } - else { - assert childInfo != null; - - newDirInfo = new IgfsFileInfo(Collections.singletonMap(childName, - new IgfsListingEntry(childInfo)), props); - } - - boolean put = id2InfoPrj.putIfAbsent(newDirInfo.id(), newDirInfo); - - assert put; // Because we used a new id that should be unique. + b.doBuild(); - childInfo = newDirInfo; - childName = components.get(i); - } - - // Now link the newly created directory chain to the lowermost existing parent: - id2InfoPrj.invoke(parentId, - new UpdateListing(childName, new IgfsListingEntry(childInfo), false)); - - // We're close to finish: tx.commit(); break; @@ -2022,17 +1831,11 @@ public class IgfsMetaManager extends IgfsManager { } else throw new IllegalStateException("Failed to mkdir because Grid is stopping. [path=" + path + ']'); - } // retry loop - - if (evts.isRecordable(EVT_IGFS_DIR_CREATED)) { - IgfsPath createdPath = existingPath; + } - for (int i = idSet.size() - 1; i < components.size(); i++) { - createdPath = new IgfsPath(createdPath, components.get(i)); + assert b != null; - evts.record(new IgfsEvent(createdPath, locNode, EVT_IGFS_DIR_CREATED)); - } - } + b.sendEvents(); return true; } @@ -2135,6 +1938,8 @@ public class IgfsMetaManager extends IgfsManager { @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath, IgfsFileInfo> infos) throws Exception { + assert validTxState(true); + assert !infos.isEmpty(); // Determine the first existing parent. @@ -2186,7 +1991,7 @@ public class IgfsMetaManager extends IgfsManager { "the secondary file system because the path points to a directory: " + path); IgfsFileInfo newInfo = new IgfsFileInfo(status.blockSize(), status.length(), affKey, - IgniteUuid.randomUuid(), igfsCtx.igfs().evictExclude(path, false), status.properties()); + composeLockId(false), igfsCtx.igfs().evictExclude(path, false), status.properties()); // Add new file info to the listing optionally removing the previous one. IgniteUuid oldId = putIfAbsentNonTx(parentInfo.id(), path.name(), newInfo); @@ -2194,6 +1999,13 @@ public class IgfsMetaManager extends IgfsManager { if (oldId != null) { IgfsFileInfo oldInfo = info(oldId); + assert oldInfo != null; // Otherwise cache is in inconsistent state. + + // The contact is that we cannot overwrite a file locked for writing: + if (oldInfo.lockId() != null) + throw fsException("Failed to overwrite file (file is opened for writing) [path=" + + path + ", fileId=" + oldId + ", lockId=" + oldInfo.lockId() + ']'); + id2InfoPrj.remove(oldId); // Remove the old one. id2InfoPrj.put(newInfo.id(), newInfo); // Put the new one. @@ -2203,29 +2015,6 @@ public class IgfsMetaManager extends IgfsManager { new UpdateListing(path.name(), new IgfsListingEntry(newInfo), false)); IgniteInternalFuture<?> delFut = igfsCtx.data().delete(oldInfo); - - // Record PURGE event if needed. - if (evts.isRecordable(EVT_IGFS_FILE_PURGED)) { - delFut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> t) { - try { - t.get(); // Ensure delete succeeded. - - evts.record(new IgfsEvent(path, locNode, EVT_IGFS_FILE_PURGED)); - } - catch (IgniteCheckedException e) { - LT.warn(log, e, "Old file deletion failed in DUAL mode [path=" + path + - ", simpleCreate=" + simpleCreate + ", props=" + props + - ", overwrite=" + overwrite + ", bufferSize=" + bufSize + - ", replication=" + replication + ", blockSize=" + blockSize + ']'); - } - } - }); - } - - // Record DELETE event if needed. - if (evts.isRecordable(EVT_IGFS_FILE_DELETED)) - pendingEvts.add(new IgfsEvent(path, locNode, EVT_IGFS_FILE_DELETED)); } // Record CREATE event if needed. @@ -2287,7 +2076,9 @@ public class IgfsMetaManager extends IgfsManager { @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath, IgfsFileInfo> infos) throws Exception { - IgfsFileInfo info = infos.get(path); + assert validTxState(true); + + final IgfsFileInfo info = infos.get(path); if (info.isDirectory()) throw fsException("Failed to open output stream to the file in the " + @@ -2314,12 +2105,22 @@ public class IgfsMetaManager extends IgfsManager { } } + if (info.lockId() != null) { + throw fsException("Failed to open file (file is opened for writing) [path=" + + path + ", fileId=" + info.id() + ", lockId=" + info.lockId() + ']'); + } + // Set lock and return. - info = lockInfo(info); + IgfsFileInfo lockedInfo = lockInfo(info, false); + + assert lockedInfo != null; // We checked the lock above. - metaCache.put(info.id(), info); + boolean put = metaCache.put(info.id(), lockedInfo); - return new IgfsSecondaryOutputStreamDescriptor(infos.get(path.parent()).id(), info, out); + assert put; + + return new IgfsSecondaryOutputStreamDescriptor(infos.get(path.parent()).id(), + lockedInfo, out); } @Override public IgfsSecondaryOutputStreamDescriptor onFailure(@Nullable Exception err) @@ -2329,8 +2130,8 @@ public class IgfsMetaManager extends IgfsManager { U.error(log, "File append in DUAL mode failed [path=" + path + ", bufferSize=" + bufSize + ']', err); - throw new IgniteCheckedException("Failed to append to the file due to secondary file system " + - "exception: " + path, err); + throw new IgniteCheckedException("Failed to append to the file due to secondary file " + + "system exception: " + path, err); } }; @@ -2438,8 +2239,8 @@ public class IgfsMetaManager extends IgfsManager { } @Override public IgfsFileInfo onFailure(@Nullable Exception err) throws IgniteCheckedException { - throw new IgniteCheckedException("Failed to synchronize path due to secondary file system " + - "exception: " + path, err); + throw new IgniteCheckedException("Failed to synchronize path due to secondary file " + + "system exception: " + path, err); } }; @@ -2517,8 +2318,8 @@ public class IgfsMetaManager extends IgfsManager { U.error(log, "Directory creation in DUAL mode failed [path=" + path + ", properties=" + props + ']', err); - throw new IgniteCheckedException("Failed to create the path due to secondary file system exception: " + - path, err); + throw new IgniteCheckedException("Failed to create the path due to secondary file system " + + "exception: " + path, err); } }; @@ -2685,8 +2486,8 @@ public class IgfsMetaManager extends IgfsManager { U.error(log, "Path delete in DUAL mode failed [path=" + path + ", recursive=" + recursive + ']', err); - throw new IgniteCheckedException("Failed to delete the path due to secondary file system exception: ", - err); + throw new IgniteCheckedException("Failed to delete the path due to secondary file system " + + "exception: ", err); } }; @@ -2713,8 +2514,8 @@ public class IgfsMetaManager extends IgfsManager { * @return Update file info. * @throws IgniteCheckedException If update failed. */ - public IgfsFileInfo updateDual(final IgfsSecondaryFileSystem fs, final IgfsPath path, final Map<String, String> props) - throws IgniteCheckedException { + public IgfsFileInfo updateDual(final IgfsSecondaryFileSystem fs, final IgfsPath path, + final Map<String, String> props) throws IgniteCheckedException { assert fs != null; assert path != null; assert props != null && !props.isEmpty(); @@ -2740,8 +2541,8 @@ public class IgfsMetaManager extends IgfsManager { U.error(log, "Path update in DUAL mode failed [path=" + path + ", properties=" + props + ']', err); - throw new IgniteCheckedException("Failed to update the path due to secondary file system exception: " + - path, err); + throw new IgniteCheckedException("Failed to update the path due to secondary file system " + + "exception: " + path, err); } }; @@ -2805,8 +2606,8 @@ public class IgfsMetaManager extends IgfsManager { if (status != null) { if (!status.isDirectory() && !curPath.equals(endPath)) - throw new IgniteCheckedException("Failed to create path the locally because secondary file system " + - "directory structure was modified concurrently and the path is not a directory as " + + throw new IgniteCheckedException("Failed to create path the locally because secondary file " + + "system directory structure was modified concurrently and the path is not a directory as " + "expected: " + curPath); } else { @@ -3084,7 +2885,8 @@ public class IgfsMetaManager extends IgfsManager { * @return {@code True} if value was stored in cache, {@code false} otherwise. * @throws IgniteCheckedException If operation failed. */ - private <K, V> boolean putx(IgniteInternalCache<K, V> cache, K key, IgniteClosure<V, V> c) throws IgniteCheckedException { + private <K, V> boolean putx(IgniteInternalCache<K, V> cache, K key, IgniteClosure<V, V> c) + throws IgniteCheckedException { assert validTxState(true); V oldVal = cache.get(key); @@ -3549,4 +3351,455 @@ public class IgfsMetaManager extends IgfsManager { return S.toString(UpdatePath.class, this); } } + + /** + * Create a new file. + * + * @param path Path. + * @param bufSize Buffer size. + * @param overwrite Overwrite flag. + * @param affKey Affinity key. + * @param replication Replication factor. + * @param props Properties. + * @param simpleCreate Whether new file should be created in secondary FS using create(Path, boolean) method. + * @return Tuple containing the created file info and its parent id. + */ + IgniteBiTuple<IgfsFileInfo, IgniteUuid> create( + final IgfsPath path, + final boolean append, + final boolean overwrite, + Map<String, String> dirProps, + final int blockSize, + final @Nullable IgniteUuid affKey, + final boolean evictExclude, + @Nullable Map<String, String> fileProps) throws IgniteCheckedException { + assert validTxState(false); + assert path != null; + + final String name = path.name(); + + DirectoryChainBuilder b = null; + + while (true) { + if (busyLock.enterBusy()) { + try { + b = new DirectoryChainBuilder(path, dirProps, fileProps) { + /** {@inheritDoc} */ + @Override protected IgfsFileInfo buildLeaf() { + return new IgfsFileInfo(blockSize, 0L, affKey, composeLockId(false), + evictExclude, leafProps); + } + }; + + // Start Tx: + IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ); + + try { + if (overwrite) + // Lock also the TRASH directory because in case of overwrite we + // may need to delete the old file: + b.idSet.add(TRASH_ID); + + final Map<IgniteUuid, IgfsFileInfo> lockedInfos = lockIds(b.idSet); + + assert !overwrite || lockedInfos.get(TRASH_ID) != null; // TRASH must exist at this point. + + // If the path was changed, we close the current Tx and repeat the procedure again + // starting from taking the path ids. + if (verifyPathIntegrity(b.existingPath, b.idList, lockedInfos)) { + // Locked path okay, trying to proceed with the remainder creation. + final IgfsFileInfo lowermostExistingInfo = lockedInfos.get(b.lowermostExistingId); + + if (b.existingIdCnt == b.components.size() + 1) { + // Full requestd path exists. + + assert b.existingPath.equals(path); + assert lockedInfos.size() == + (overwrite ? b.existingIdCnt + 1/*TRASH*/ : b.existingIdCnt); + + if (lowermostExistingInfo.isDirectory()) { + throw new IgfsPathAlreadyExistsException("Failed to " + + (append ? "open" : "create") + " file (path points to an " + + "existing directory): " + path); + } + else { + // This is a file. + assert lowermostExistingInfo.isFile(); + + final IgniteUuid parentId = b.idList.get(b.idList.size() - 2); + + final IgniteUuid lockId = lowermostExistingInfo.lockId(); + + if (append) { + if (lockId != null) + throw fsException("Failed to open file (file is opened for writing) " + + "[fileName=" + name + ", fileId=" + lowermostExistingInfo.id() + + ", lockId=" + lockId + ']'); + + IgniteUuid newLockId = composeLockId(false); + + EntryProcessorResult<IgfsFileInfo> result + = id2InfoPrj.invoke(lowermostExistingInfo.id(), + new LockFileProcessor(newLockId)); + + IgfsFileInfo lockedInfo = result.get(); + + assert lockedInfo != null; // we already checked lock above. + assert lockedInfo.lockId() != null; + assert lockedInfo.lockId().equals(newLockId); + assert lockedInfo.id().equals(lowermostExistingInfo.id()); + + IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = new T2<>(lockedInfo, parentId); + + tx.commit(); + + IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, + EventType.EVT_IGFS_FILE_OPENED_WRITE); + + return t2; + } + else if (overwrite) { + // Delete existing file, but fail if it is locked: + if (lockId != null) + throw fsException("Failed to overwrite file (file is opened for writing) " + + "[fileName=" + name + ", fileId=" + lowermostExistingInfo.id() + + ", lockId=" + lockId + ']'); + + final IgfsListingEntry deletedEntry = lockedInfos.get(parentId).listing() + .get(name); + + assert deletedEntry != null; + + id2InfoPrj.invoke(parentId, new UpdateListing(name, deletedEntry, true)); + + // Add listing entry into the destination parent listing. + id2InfoPrj.invoke(TRASH_ID, new UpdateListing( + lowermostExistingInfo.id().toString(), deletedEntry, false)); + + // Update a file info of the removed file with a file path, + // which will be used by delete worker for event notifications. + id2InfoPrj.invoke(lowermostExistingInfo.id(), new UpdatePath(path)); + + // Make a new locked info: + final IgfsFileInfo newFileInfo = new IgfsFileInfo(cfg.getBlockSize(), 0L, + affKey, composeLockId(false), evictExclude, fileProps); + + assert newFileInfo.lockId() != null; // locked info should be created. + + boolean put = id2InfoPrj.putIfAbsent(newFileInfo.id(), newFileInfo); + + assert put; + + id2InfoPrj.invoke(parentId, + new UpdateListing(name, new IgfsListingEntry(newFileInfo), false)); + + IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = new T2<>(newFileInfo, parentId); + + tx.commit(); + + delWorker.signal(); + + IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_WRITE); + + return t2; + } + else { + throw new IgfsPathAlreadyExistsException("Failed to create file (file " + + "already exists and overwrite flag is false): " + path); + } + } + } + + // The full requested path does not exist. + + // Check only the lowermost directory in the existing directory chain + // because others are already checked in #verifyPathIntegrity() above. + if (!lowermostExistingInfo.isDirectory()) + throw new IgfsParentNotDirectoryException("Failed to " + (append ? "open" : "create" ) + + " file (parent element is not a directory)"); + + Map<String, IgfsListingEntry> parentListing = lowermostExistingInfo.listing(); + + final String uppermostFileToBeCreatedName = b.components.get(b.existingIdCnt - 1); + + final IgfsListingEntry entry = parentListing.get(uppermostFileToBeCreatedName); + + if (entry == null) { + b.doBuild(); + + assert b.leafInfo != null; + assert b.leafParentId != null; + + IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = new T2<>(b.leafInfo, b.leafParentId); + + tx.commit(); + + b.sendEvents(); + + return t2; + } + + // Another thread concurrently created file or directory in the path with + // the name we need. + } + } + finally { + tx.close(); + } + } + finally { + busyLock.leaveBusy(); + } + } else + throw new IllegalStateException("Failed to mkdir because Grid is stopping. [path=" + path + ']'); + } + } + + /** File chain builder. */ + private class DirectoryChainBuilder { + /** The requested path to be created. */ + protected final IgfsPath path; + + /** Full path components. */ + protected final List<String> components; + + /** The list of ids. */ + protected final List<IgniteUuid> idList; + + /** The set of ids. */ + protected final SortedSet<IgniteUuid> idSet; + + /** The middle node properties. */ + protected final Map<String, String> middleProps; + + /** The leaf node properties. */ + protected final Map<String, String> leafProps; + + /** The lowermost exsiting path id. */ + protected final IgniteUuid lowermostExistingId; + + /** The existing path. */ + protected final IgfsPath existingPath; + + /** The created leaf info. */ + protected IgfsFileInfo leafInfo; + + /** The leaf parent id. */ + protected IgniteUuid leafParentId; + + /** The number of existing ids. */ + protected final int existingIdCnt; + + /** + * Creates the builder and performa all the initial calculations. + */ + protected DirectoryChainBuilder(IgfsPath path, + Map<String,String> middleProps, Map<String,String> leafProps) throws IgniteCheckedException { + this.path = path; + + this.components = path.components(); + + this.idList = fileIds(path); + + this.idSet = new TreeSet<IgniteUuid>(PATH_ID_SORTING_COMPARATOR); + + this.middleProps = middleProps; + + this.leafProps = leafProps; + // Store all the non-null ids in the set & construct existing path in one loop: + IgfsPath existingPath = path.root(); + + assert idList.size() == components.size() + 1; + + // Find the lowermost existing id: + IgniteUuid lowermostExistingId = null; + + int idIdx = 0; + + for (IgniteUuid id: idList) { + if (id == null) + break; + + lowermostExistingId = id; + + boolean added = idSet.add(id); + + assert added : "Not added id = " + id; + + if (idIdx >= 1) // skip root. + existingPath = new IgfsPath(existingPath, components.get(idIdx - 1)); + + idIdx++; + } + + assert idSet.contains(ROOT_ID); + + this.lowermostExistingId = lowermostExistingId; + + this.existingPath = existingPath; + + this.existingIdCnt = idSet.size(); + } + + /** + * Builds middle nodes. + */ + protected IgfsFileInfo buildMiddleNode(String childName, IgfsFileInfo childInfo) { + return new IgfsFileInfo(Collections.singletonMap(childName, + new IgfsListingEntry(childInfo)), middleProps); + } + + /** + * Builds leaf. + */ + protected IgfsFileInfo buildLeaf() { + return new IgfsFileInfo(true, leafProps); + } + + /** + * Links newly created chain to existing parent. + */ + final void linkBuiltChainToExistingParent(String childName, IgfsFileInfo childInfo) + throws IgniteCheckedException { + assert childInfo != null; + + id2InfoPrj.invoke(lowermostExistingId, + new UpdateListing(childName, new IgfsListingEntry(childInfo), false)); + } + + /** + * Does the main portion of job building the renmaining path. + */ + public final void doBuild() throws IgniteCheckedException { + IgfsFileInfo childInfo = null; + + String childName = null; + + IgfsFileInfo newLeafInfo; + IgniteUuid parentId = null; + + // This loop creates the missing directory chain from the bottom to the top: + for (int i = components.size() - 1; i >= existingIdCnt - 1; i--) { + // Required entry does not exist. + // Create new directory info: + if (childName == null) { + assert childInfo == null; + + newLeafInfo = buildLeaf(); + + assert newLeafInfo != null; + + leafInfo = newLeafInfo; + } + else { + assert childInfo != null; + + newLeafInfo = buildMiddleNode(childName, childInfo); + + assert newLeafInfo != null; + + if (parentId == null) + parentId = newLeafInfo.id(); + } + + boolean put = id2InfoPrj.putIfAbsent(newLeafInfo.id(), newLeafInfo); + + assert put; // Because we used a new id that should be unique. + + childInfo = newLeafInfo; + + childName = components.get(i); + } + + if (parentId == null) + parentId = lowermostExistingId; + + leafParentId = parentId; + + // Now link the newly created directory chain to the lowermost existing parent: + linkBuiltChainToExistingParent(childName, childInfo); + } + + /** + * Sends events. + */ + public final void sendEvents() { + if (evts.isRecordable(EVT_IGFS_DIR_CREATED)) { + IgfsPath createdPath = existingPath; + + for (int i = existingPath.components().size(); i < components.size() - 1; i++) { + createdPath = new IgfsPath(createdPath, components.get(i)); + + IgfsUtils.sendEvents(igfsCtx.kernalContext(), createdPath, EVT_IGFS_DIR_CREATED); + } + } + + if (leafInfo.isDirectory()) + IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_DIR_CREATED); + else { + IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_CREATED); + IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_WRITE); + } + } + } + + /** + * Processor closure to locks a file for writing. + */ + private static class LockFileProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, IgfsFileInfo>, + Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** New lock id to lock the entry. */ + private IgniteUuid newLockId; + + /** + * Constructor. + */ + public LockFileProcessor(IgniteUuid newLockId) { + assert newLockId != null; + + this.newLockId = newLockId; + } + + /** + * Empty constructor required for {@link Externalizable}. + */ + public LockFileProcessor() { + // No-op. + } + + /** {@inheritDoc} */ + @Override @Nullable public IgfsFileInfo process(MutableEntry<IgniteUuid, IgfsFileInfo> entry, + Object... arguments) throws EntryProcessorException { + final IgfsFileInfo info = entry.getValue(); + + assert info != null; + + if (info.lockId() != null) + return null; // file is already locked. + + IgfsFileInfo newInfo = new IgfsFileInfo(info, newLockId, info.modificationTime()); + + entry.setValue(newInfo); + + return newInfo; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeGridUuid(out, newLockId); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + newLockId = U.readGridUuid(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(LockFileProcessor.class, this); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/962fcce3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java index c297eed..c9225ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java @@ -121,6 +121,8 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter { if (fileInfo.lockId() == null) throw new IgfsException("Failed to acquire file lock (concurrently modified?): " + path); + assert !IgfsMetaManager.DELETE_LOCK_ID.equals(fileInfo.lockId()); + this.igfsCtx = igfsCtx; meta = igfsCtx.meta(); data = igfsCtx.data(); http://git-wip-us.apache.org/repos/asf/ignite/blob/962fcce3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java index 50ebd56..07fdda4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java @@ -21,10 +21,15 @@ import java.lang.reflect.Constructor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.events.IgfsEvent; import org.apache.ignite.igfs.IgfsException; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.lang.IgniteOutClosureX; @@ -158,4 +163,22 @@ public class IgfsUtils { throw new IgniteCheckedException("Failed to perform operation since max number of attempts " + "exceeded. [maxAttempts=" + MAX_CACHE_TX_RETRIES + ']'); } + + + /** + * Sends a series of event. + * + * @param path The path of the created file. + * @param type The type of event to send. + */ + public static void sendEvents(GridKernalContext kernalCtx, IgfsPath path, int type) { + assert kernalCtx != null; + assert path != null; + + GridEventStorageManager evts = kernalCtx.event(); + ClusterNode locNode = kernalCtx.discovery().localNode(); + + if (evts.isRecordable(type)) + evts.record(new IgfsEvent(path, locNode, type)); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/962fcce3/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java index f0f86ec..6ca75a1 100644 --- a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java @@ -683,7 +683,7 @@ public abstract class IgfsEventsAbstractSelfTest extends GridCommonAbstractTest public void testSingleFileOverwrite() throws Exception { final List<Event> evtList = new ArrayList<>(); - final int evtsCnt = 3 + 4 + 1; + final int evtsCnt = 1 + 4 + 1; final CountDownLatch latch = new CountDownLatch(evtsCnt); @@ -703,7 +703,7 @@ public abstract class IgfsEventsAbstractSelfTest extends GridCommonAbstractTest igfs.create(file, false).close(); // Will generate create, open and close events. - igfs.create(file, true).close(); // Will generate same event set + delete and purge events. + igfs.create(file, true).close(); // Will generate only OPEN_WRITE & close events. try { igfs.create(file, false).close(); // Won't generate any event. @@ -732,7 +732,7 @@ public abstract class IgfsEventsAbstractSelfTest extends GridCommonAbstractTest assertEquals(0, evt.dataSize()); assertOneToOne( - evtList.subList(3, 8), + evtList.subList(3, evtsCnt), new P1<Event>() { @Override public boolean apply(Event e) { IgfsEvent e0 = (IgfsEvent)e;
