IGNITE-2846: IGFS: Reworked IgfsMetaManager.updateInfo() operation to use "invoke" instead of "put".
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a1efc5a0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a1efc5a0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a1efc5a0 Branch: refs/heads/ignite-2407 Commit: a1efc5a06b15acffa40ad0a9d3352206061b42f6 Parents: dfe5ea8 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Wed Mar 16 13:22:07 2016 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Wed Mar 16 13:22:07 2016 +0300 ---------------------------------------------------------------------- .../internal/processors/igfs/IgfsFileMap.java | 9 +- .../igfs/IgfsFragmentizerManager.java | 166 ++++++++++++++----- .../igfs/IgfsInvalidRangeException.java | 4 +- .../processors/igfs/IgfsMetaManager.java | 67 ++++---- .../igfs/IgfsMetaManagerSelfTest.java | 11 -- 5 files changed, 161 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a1efc5a0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileMap.java index 2c0358b..9ea69ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileMap.java @@ -128,12 +128,11 @@ public class IgfsFileMap implements Externalizable { * * @param range Range to update status. * @param status New range status. - * @throws IgniteCheckedException If range was not found. */ - public void updateRangeStatus(IgfsFileAffinityRange range, int status) throws IgniteCheckedException { + public void updateRangeStatus(IgfsFileAffinityRange range, int status) { if (ranges == null) throw new IgfsInvalidRangeException("Failed to update range status (file map is empty) " + - "[range=" + range + ", ranges=" + ranges + ']'); + "[range=" + range + ", ranges=null]"); assert !ranges.isEmpty(); @@ -190,10 +189,10 @@ public class IgfsFileMap implements Externalizable { * * @param range Range to delete. */ - public void deleteRange(IgfsFileAffinityRange range) throws IgniteCheckedException { + public void deleteRange(IgfsFileAffinityRange range) { if (ranges == null) throw new IgfsInvalidRangeException("Failed to remove range (file map is empty) " + - "[range=" + range + ", ranges=" + ranges + ']'); + "[range=" + range + ", ranges=null]"); assert !ranges.isEmpty(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a1efc5a0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java index 899730d..7cc5cb6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java @@ -17,6 +17,10 @@ package org.apache.ignite.internal.processors.igfs; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; @@ -41,19 +45,22 @@ import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.GridSpinReadWriteLock; -import org.apache.ignite.internal.util.typedef.CX1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.LT; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; + import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -286,7 +293,8 @@ public class IgfsFragmentizerManager extends IgfsManager { switch (range.status()) { case RANGE_STATUS_INITIAL: { // Mark range as moving. - updated = igfsCtx.meta().updateInfo(fileId, updateRange(range, RANGE_STATUS_MOVING)); + updated = igfsCtx.meta().updateInfo( + fileId, new RangeUpdateProcessor(range, RANGE_STATUS_MOVING)); if (updated == null) { igfsCtx.data().cleanBlocks(fileInfo, range, true); @@ -302,7 +310,8 @@ public class IgfsFragmentizerManager extends IgfsManager { igfsCtx.data().spreadBlocks(fileInfo, range); // Mark range as moved. - updated = igfsCtx.meta().updateInfo(fileId, updateRange(range, RANGE_STATUS_MOVED)); + updated = igfsCtx.meta().updateInfo( + fileId, new RangeUpdateProcessor(range, RANGE_STATUS_MOVED)); if (updated == null) { igfsCtx.data().cleanBlocks(fileInfo, range, true); @@ -318,7 +327,7 @@ public class IgfsFragmentizerManager extends IgfsManager { igfsCtx.data().cleanBlocks(fileInfo, range, false); // Remove range from map. - updated = igfsCtx.meta().updateInfo(fileId, deleteRange(range)); + updated = igfsCtx.meta().updateInfo(fileId, new RangeDeleteProcessor(range)); if (updated == null) igfsCtx.data().cleanBlocks(fileInfo, range, true); @@ -334,57 +343,132 @@ public class IgfsFragmentizerManager extends IgfsManager { } /** - * Creates update info closure that will mark given range as moving. - * - * @param range Range to mark as moving. - * @param status Status. - * @return Update closure. + * Update range processor. */ - private IgniteClosure<IgfsFileInfo, IgfsFileInfo> updateRange(final IgfsFileAffinityRange range, - final int status) { - return new CX1<IgfsFileInfo, IgfsFileInfo>() { - @Override public IgfsFileInfo applyx(IgfsFileInfo info) throws IgniteCheckedException { - IgfsFileMap map = new IgfsFileMap(info.fileMap()); + private static class RangeUpdateProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, IgfsFileInfo>, + Externalizable { + /** */ + private static final long serialVersionUID = 0L; - map.updateRangeStatus(range, status); + /** Range. */ + private IgfsFileAffinityRange range; - if (log.isDebugEnabled()) - log.debug("Updated file map for range [fileId=" + info.id() + ", range=" + range + - ", status=" + status + ", oldMap=" + info.fileMap() + ", newMap=" + map + ']'); + /** Status. */ + private int status; - IgfsFileInfo updated = new IgfsFileInfo(info, info.length()); + /** + * Constructor. + */ + public RangeUpdateProcessor() { + // No-op. + } - updated.fileMap(map); + /** + * Constructor. + * + * @param range Range. + * @param status Status. + */ + public RangeUpdateProcessor(IgfsFileAffinityRange range, int status) { + this.range = range; + this.status = status; + } - return updated; - } - }; + /** {@inheritDoc} */ + @Override public IgfsFileInfo process(MutableEntry<IgniteUuid, IgfsFileInfo> entry, Object... args) + throws EntryProcessorException { + IgfsFileInfo oldInfo = entry.getValue(); + + IgfsFileMap newMap = new IgfsFileMap(oldInfo.fileMap()); + + newMap.updateRangeStatus(range, status); + + IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, oldInfo.length()); + + newInfo.fileMap(newMap); + + entry.setValue(newInfo); + + return newInfo; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(range); + out.writeInt(status); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + range = (IgfsFileAffinityRange)in.readObject(); + status = in.readInt(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(RangeUpdateProcessor.class, this); + } } /** - * Creates update info closure that will mark given range as moving. - * - * @param range Range to mark as moving. - * @return Update closure. + * Delete range processor. */ - private IgniteClosure<IgfsFileInfo, IgfsFileInfo> deleteRange(final IgfsFileAffinityRange range) { - return new CX1<IgfsFileInfo, IgfsFileInfo>() { - @Override public IgfsFileInfo applyx(IgfsFileInfo info) throws IgniteCheckedException { - IgfsFileMap map = new IgfsFileMap(info.fileMap()); + private static class RangeDeleteProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, IgfsFileInfo>, + Externalizable { + /** */ + private static final long serialVersionUID = 0L; - map.deleteRange(range); + /** Range. */ + private IgfsFileAffinityRange range; - if (log.isDebugEnabled()) - log.debug("Deleted range from file map [fileId=" + info.id() + ", range=" + range + - ", oldMap=" + info.fileMap() + ", newMap=" + map + ']'); + /** + * Constructor. + */ + public RangeDeleteProcessor() { + // No-op. + } - IgfsFileInfo updated = new IgfsFileInfo(info, info.length()); + /** + * Constructor. + * + * @param range Range. + */ + public RangeDeleteProcessor(IgfsFileAffinityRange range) { + this.range = range; + } - updated.fileMap(map); + /** {@inheritDoc} */ + @Override public IgfsFileInfo process(MutableEntry<IgniteUuid, IgfsFileInfo> entry, Object... args) + throws EntryProcessorException { + IgfsFileInfo oldInfo = entry.getValue(); - return updated; - } - }; + IgfsFileMap newMap = new IgfsFileMap(oldInfo.fileMap()); + + newMap.deleteRange(range); + + IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, oldInfo.length()); + + newInfo.fileMap(newMap); + + entry.setValue(newInfo); + + return newInfo; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(range); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + range = (IgfsFileAffinityRange)in.readObject(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(RangeDeleteProcessor.class, this); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a1efc5a0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInvalidRangeException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInvalidRangeException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInvalidRangeException.java index cd93278..d6ad2b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInvalidRangeException.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInvalidRangeException.java @@ -17,13 +17,13 @@ package org.apache.ignite.internal.processors.igfs; -import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; /** * Internal exception thrown when attempted to update range that is no longer present * in file affinity map. */ -public class IgfsInvalidRangeException extends IgniteCheckedException { +public class IgfsInvalidRangeException extends IgniteException { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/a1efc5a0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java index 2a85cf8..b4774f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java @@ -1257,7 +1257,7 @@ public class IgfsMetaManager extends IgfsManager { // Remove listing entries from root. for (Map.Entry<String, IgfsListingEntry> entry : transferListing.entrySet()) - id2InfoPrj.invoke(IgfsUtils.ROOT_ID, new ListingRemove(entry.getKey(), entry.getValue().fileId())); + id2InfoPrj.invoke(IgfsUtils.ROOT_ID, new ListingRemoveProcessor(entry.getKey(), entry.getValue().fileId())); resId = newInfo.id(); } @@ -1408,7 +1408,7 @@ public class IgfsMetaManager extends IgfsManager { IgfsListingEntry childEntry = parentInfo.listing().get(name); if (childEntry != null) - id2InfoPrj.invoke(parentId, new ListingRemove(name, id)); + id2InfoPrj.invoke(parentId, new ListingRemoveProcessor(name, id)); id2InfoPrj.remove(id); @@ -1584,20 +1584,20 @@ public class IgfsMetaManager extends IgfsManager { * Update file info in cache. * * @param fileId File ID to update information for. - * @param c Closure to update file's info inside transaction. + * @param proc Entry processor to invoke. * @return Updated file info or {@code null} if such file ID not found. * @throws IgniteCheckedException If failed. */ - @Nullable public IgfsFileInfo updateInfo(IgniteUuid fileId, IgniteClosure<IgfsFileInfo, IgfsFileInfo> c) - throws IgniteCheckedException { + @Nullable public IgfsFileInfo updateInfo(IgniteUuid fileId, + EntryProcessor<IgniteUuid, IgfsFileInfo, IgfsFileInfo> proc) throws IgniteCheckedException { validTxState(false); assert fileId != null; - assert c != null; + assert proc != null; if (busyLock.enterBusy()) { try { if (log.isDebugEnabled()) - log.debug("Update file info [fileId=" + fileId + ", c=" + c + ']'); + log.debug("Update file info [fileId=" + fileId + ", proc=" + proc + ']'); IgniteInternalTx tx = startTx(); @@ -1608,27 +1608,21 @@ public class IgfsMetaManager extends IgfsManager { if (oldInfo == null) return null; // File not found. - IgfsFileInfo newInfo = c.apply(oldInfo); + IgfsFileInfo newInfo = invokeAndGet(fileId, proc); if (newInfo == null) throw fsException("Failed to update file info with null value" + - " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", c=" + c + ']'); + " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", proc=" + proc + ']'); if (!oldInfo.id().equals(newInfo.id())) throw fsException("Failed to update file info (file IDs differ)" + - " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", c=" + c + ']'); + " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", proc=" + proc + ']'); if (oldInfo.isDirectory() != newInfo.isDirectory()) throw fsException("Failed to update file info (file types differ)" + - " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", c=" + c + ']'); - - boolean b = id2InfoPrj.replace(fileId, oldInfo, newInfo); - - assert b : "Inconsistent transaction state [oldInfo=" + oldInfo + ", newInfo=" + newInfo + - ", c=" + c + ']'; + " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", proc=" + proc + ']'); - if (tx != null) - tx.commit(); + tx.commit(); return newInfo; } @@ -1636,8 +1630,7 @@ public class IgfsMetaManager extends IgfsManager { throw U.cast(e); } finally { - if (tx != null) - tx.close(); + tx.close(); } } finally { @@ -1814,7 +1807,7 @@ public class IgfsMetaManager extends IgfsManager { throw fsException("Failed to create new metadata entry due to ID conflict: " + info.id()); if (parentId != null) - id2InfoPrj.invoke(parentId, new ListingAdd(name, new IgfsListingEntry(info))); + id2InfoPrj.invoke(parentId, new ListingAddProcessor(name, new IgfsListingEntry(info))); } /** @@ -1831,8 +1824,8 @@ public class IgfsMetaManager extends IgfsManager { IgniteUuid destId, String destName) throws IgniteCheckedException { validTxState(true); - id2InfoPrj.invoke(srcId, new ListingRemove(srcName, entry.fileId())); - id2InfoPrj.invoke(destId, new ListingAdd(destName, entry)); + id2InfoPrj.invoke(srcId, new ListingRemoveProcessor(srcName, entry.fileId())); + id2InfoPrj.invoke(destId, new ListingAddProcessor(destName, entry)); } /** @@ -1857,7 +1850,7 @@ public class IgfsMetaManager extends IgfsManager { private void invokeUpdatePath(IgniteUuid id, IgfsPath path) throws IgniteCheckedException { validTxState(true); - id2InfoPrj.invoke(id, new UpdatePath(path)); + id2InfoPrj.invoke(id, new UpdatePathProcessor(path)); } /** @@ -2009,7 +2002,7 @@ public class IgfsMetaManager extends IgfsManager { id2InfoPrj.remove(oldId); // Remove the old one. id2InfoPrj.invoke(parentInfo.id(), - new ListingRemove(path.name(), parentInfo.listing().get(path.name()).fileId())); + new ListingRemoveProcessor(path.name(), parentInfo.listing().get(path.name()).fileId())); createNewEntry(newInfo, parentInfo.id(), path.name()); // Put new one. @@ -3101,7 +3094,7 @@ public class IgfsMetaManager extends IgfsManager { * Remove entry from directory listing. */ @GridInternal - private static final class ListingRemove implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>, + private static final class ListingRemoveProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -3115,7 +3108,7 @@ public class IgfsMetaManager extends IgfsManager { /** * Default constructor. */ - public ListingRemove() { + public ListingRemoveProcessor() { // No-op. } @@ -3125,7 +3118,7 @@ public class IgfsMetaManager extends IgfsManager { * @param fileName File name. * @param fileId File ID. */ - public ListingRemove(String fileName, IgniteUuid fileId) { + public ListingRemoveProcessor(String fileName, IgniteUuid fileId) { this.fileName = fileName; this.fileId = fileId; } @@ -3173,7 +3166,7 @@ public class IgfsMetaManager extends IgfsManager { * Update directory listing closure. */ @GridInternal - private static final class ListingAdd implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>, + private static final class ListingAddProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -3190,7 +3183,7 @@ public class IgfsMetaManager extends IgfsManager { * @param fileName File name to add into parent listing. * @param entry Listing entry to add or remove. */ - private ListingAdd(String fileName, IgfsListingEntry entry) { + private ListingAddProcessor(String fileName, IgfsListingEntry entry) { assert fileName != null; assert entry != null; @@ -3202,7 +3195,7 @@ public class IgfsMetaManager extends IgfsManager { * Empty constructor required for {@link Externalizable}. * */ - public ListingAdd() { + public ListingAddProcessor() { // No-op. } @@ -3242,7 +3235,7 @@ public class IgfsMetaManager extends IgfsManager { /** {@inheritDoc} */ @Override public String toString() { - return S.toString(ListingAdd.class, this); + return S.toString(ListingAddProcessor.class, this); } } @@ -3250,7 +3243,7 @@ public class IgfsMetaManager extends IgfsManager { * Update path closure. */ @GridInternal - private static final class UpdatePath implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>, + private static final class UpdatePathProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -3261,14 +3254,14 @@ public class IgfsMetaManager extends IgfsManager { /** * @param path Path. */ - private UpdatePath(IgfsPath path) { + private UpdatePathProcessor(IgfsPath path) { this.path = path; } /** * Default constructor (required by Externalizable). */ - public UpdatePath() { + public UpdatePathProcessor() { // No-op. } @@ -3293,7 +3286,7 @@ public class IgfsMetaManager extends IgfsManager { /** {@inheritDoc} */ @Override public String toString() { - return S.toString(UpdatePath.class, this); + return S.toString(UpdatePathProcessor.class, this); } } @@ -3665,7 +3658,7 @@ public class IgfsMetaManager extends IgfsManager { leafParentId = parentId; // Now link the newly created directory chain to the lowermost existing parent: - id2InfoPrj.invoke(lowermostExistingId, new ListingAdd(childName, childInfo)); + id2InfoPrj.invoke(lowermostExistingId, new ListingAddProcessor(childName, childInfo)); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a1efc5a0/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java index 72a2bee..19a91ad 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java @@ -352,17 +352,6 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest { assertEmpty(mgr.directoryListing(b.id())); - // Validate last actual data received from 'remove' operation. - IgfsFileInfo newF2 = mgr.updateInfo(f2.id(), new C1<IgfsFileInfo, IgfsFileInfo>() { - @Override public IgfsFileInfo apply(IgfsFileInfo e) { - return new IgfsFileInfo(e, e.length() + 20); - } - }); - - assertNotNull(newF2); - assertEquals(f2.id(), newF2.id()); - assertNotSame(f2, newF2); - del = mgr.softDelete(path("/a/f2"), false); assertEquals(f2.id(), del);