Repository: nifi Updated Branches: refs/heads/master afc8c645a -> 4b74e4de7
http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java index 77048d1..b21ac8e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java @@ -36,6 +36,9 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.FlowModification; import org.apache.nifi.web.InvalidRevisionException; import org.apache.nifi.web.Revision; @@ -61,6 +64,10 @@ public class NaiveRevisionManager implements RevisionManager { this(1, TimeUnit.MINUTES); } + public NaiveRevisionManager(final NiFiProperties properties) { + this(getRequestTimeoutMillis(properties), TimeUnit.MILLISECONDS); + } + /** * Constructs a new NaiveRevisionManager that uses the given amount of time as the expiration time * for a Revision Claims @@ -72,13 +79,20 @@ public class NaiveRevisionManager implements RevisionManager { this.claimExpirationNanos = timeUnit.toNanos(claimExpiration); } + private static long getRequestTimeoutMillis(final NiFiProperties properties) { + return FormatUtils.getTimeDuration(properties.getProperty(NiFiProperties.REQUEST_REPLICATION_CLAIM_TIMEOUT, + NiFiProperties.DEFAULT_REQUEST_REPLICATION_CLAIM_TIMEOUT), TimeUnit.MILLISECONDS); + } + @Override - public RevisionClaim requestClaim(Revision revision) throws InvalidRevisionException { - return requestClaim(Collections.singleton(revision)); + public RevisionClaim requestClaim(final Revision revision, final NiFiUser user) throws InvalidRevisionException { + Objects.requireNonNull(user); + return requestClaim(Collections.singleton(revision), user); } @Override - public RevisionClaim requestClaim(final Collection<Revision> revisions) { + public RevisionClaim requestClaim(final Collection<Revision> revisions, final NiFiUser user) { + Objects.requireNonNull(user); logger.debug("Attempting to claim Revisions {}", revisions); // Try to obtain a Revision Claim (temporary lock) on all revisions @@ -91,7 +105,7 @@ public class NaiveRevisionManager implements RevisionManager { final Revision revision = revisionList.get(i); final RevisionLock revisionLock = getRevisionLock(revision); - final ClaimResult claimResult = revisionLock.requestClaim(revision); + final ClaimResult claimResult = revisionLock.requestClaim(revision, user); logger.trace("Obtained Revision Claim for {}", revision); if (claimResult.isSuccessful()) { @@ -147,7 +161,8 @@ public class NaiveRevisionManager implements RevisionManager { } @Override - public <T> T deleteRevision(final RevisionClaim claim, final DeleteRevisionTask<T> task) throws ExpiredRevisionClaimException { + public <T> T deleteRevision(final RevisionClaim claim, final NiFiUser user, final DeleteRevisionTask<T> task) throws ExpiredRevisionClaimException { + Objects.requireNonNull(user); logger.debug("Attempting to delete revision using {}", claim); int successCount = 0; final List<Revision> revisionList = new ArrayList<>(claim.getRevisions()); @@ -156,7 +171,7 @@ public class NaiveRevisionManager implements RevisionManager { String failedId = null; for (final Revision revision : revisionList) { final RevisionLock revisionLock = getRevisionLock(revision); - final boolean verified = revisionLock.requestWriteLock(revision); + final boolean verified = revisionLock.requestWriteLock(revision, user); if (verified) { logger.trace("Verified Revision Claim for {}", revision); @@ -185,7 +200,7 @@ public class NaiveRevisionManager implements RevisionManager { for (int i = 0; i < successCount; i++) { final Revision revision = revisionList.get(i); final RevisionLock revisionLock = getRevisionLock(revision); - revisionLock.relinquishRevisionClaim(revision); + revisionLock.relinquishRevisionClaim(revision, null); logger.debug("Relinquished lock for {}", revision); } @@ -194,7 +209,8 @@ public class NaiveRevisionManager implements RevisionManager { } @Override - public <T> RevisionUpdate<T> updateRevision(final RevisionClaim originalClaim, final String modifier, final UpdateRevisionTask<T> task) throws ExpiredRevisionClaimException { + public <T> RevisionUpdate<T> updateRevision(final RevisionClaim originalClaim, final NiFiUser user, final UpdateRevisionTask<T> task) throws ExpiredRevisionClaimException { + Objects.requireNonNull(user); int successCount = 0; logger.debug("Attempting to update revision using {}", originalClaim); @@ -204,7 +220,7 @@ public class NaiveRevisionManager implements RevisionManager { String failedId = null; for (final Revision revision : revisionList) { final RevisionLock revisionLock = getRevisionLock(revision); - final boolean verified = revisionLock.requestWriteLock(revision); + final boolean verified = revisionLock.requestWriteLock(revision, user); if (verified) { logger.trace("Verified Revision Claim for {}", revision); @@ -247,7 +263,7 @@ public class NaiveRevisionManager implements RevisionManager { for (final Revision revision : revisionList) { final Revision updatedRevision = updatedRevisions.get(revision); - getRevisionLock(revision).unlock(revision, updatedRevision, modifier); + getRevisionLock(revision).unlock(revision, updatedRevision, user.getUserName()); if (updatedRevision.getVersion() != revision.getVersion()) { logger.debug("Unlocked Revision {} and updated associated Version to {}", revision, updatedRevision.getVersion()); @@ -274,7 +290,8 @@ public class NaiveRevisionManager implements RevisionManager { } @Override - public boolean releaseClaim(final RevisionClaim claim) { + public boolean releaseClaim(final RevisionClaim claim, final NiFiUser user) { + Objects.requireNonNull(user); boolean success = true; final List<Revision> revisions = new ArrayList<>(claim.getRevisions()); @@ -282,7 +299,7 @@ public class NaiveRevisionManager implements RevisionManager { for (final Revision revision : revisions) { final RevisionLock revisionLock = getRevisionLock(revision); - success = revisionLock.relinquishRevisionClaim(revision) && success; + success = revisionLock.relinquishRevisionClaim(revision, user) && success; } return success; @@ -299,14 +316,37 @@ public class NaiveRevisionManager implements RevisionManager { return false; } - return revisionLock.releaseClaimIfCurrentThread(); + return revisionLock.releaseClaimIfCurrentThread(null); + } + + @Override + public boolean cancelClaim(Revision revision) { + logger.debug("Attempting to cancel claim for {}", revision); + + final RevisionLock revisionLock = getRevisionLock(revision); + if (revisionLock == null) { + logger.debug("No Revision Lock exists for {} - there is no claim to cancel", revision); + return false; + } + + return revisionLock.releaseClaimIfCurrentThread(revision); + } + + @Override + public boolean cancelClaims(final Set<Revision> revisions) { + boolean successful = false; + for (final Revision revision : revisions) { + successful = cancelClaim(revision); + } + + return successful; } @Override public <T> T get(final String componentId, final ReadOnlyRevisionCallback<T> callback) { final RevisionLock revisionLock = revisionLockMap.computeIfAbsent(componentId, id -> new RevisionLock(new FlowModification(new Revision(0L, null, id), null), claimExpirationNanos)); logger.debug("Attempting to obtain read lock for {}", revisionLock.getRevision()); - revisionLock.acquireReadLock(); + revisionLock.acquireReadLock(null, revisionLock.getRevision().getClientId()); logger.debug("Obtained read lock for {}", revisionLock.getRevision()); try { @@ -327,7 +367,7 @@ public class NaiveRevisionManager implements RevisionManager { for (final String componentId : sortedIds) { final RevisionLock revisionLock = revisionLockMap.computeIfAbsent(componentId, id -> new RevisionLock(new FlowModification(new Revision(0L, null, id), null), claimExpirationNanos)); logger.trace("Attempting to obtain read lock for {}", revisionLock.getRevision()); - revisionLock.acquireReadLock(); + revisionLock.acquireReadLock(null, revisionLock.getRevision().getClientId()); revisionLocks.push(revisionLock); logger.trace("Obtained read lock for {}", revisionLock.getRevision()); } @@ -376,9 +416,9 @@ public class NaiveRevisionManager implements RevisionManager { * * @return <code>true</code> if the Revision is valid and a Claim has been granted, <code>false</code> otherwise */ - public ClaimResult requestClaim(final Revision proposedRevision) { + public ClaimResult requestClaim(final Revision proposedRevision, final NiFiUser user) { // acquire the claim, blocking if necessary. - acquireClaim(proposedRevision.getClientId()); + acquireClaim(user, proposedRevision.getClientId()); threadLock.writeLock().lock(); try { @@ -408,7 +448,7 @@ public class NaiveRevisionManager implements RevisionManager { * @return <code>true</code> if the Revision Claim was upgraded to a lock, <code>false</code> otherwise * @throws ExpiredRevisionClaimException if the Revision Claim for the given Revision has already expired */ - public boolean requestWriteLock(final Revision proposedRevision) throws ExpiredRevisionClaimException { + public boolean requestWriteLock(final Revision proposedRevision, final NiFiUser user) throws ExpiredRevisionClaimException { Objects.requireNonNull(proposedRevision); threadLock.writeLock().lock(); @@ -423,26 +463,34 @@ public class NaiveRevisionManager implements RevisionManager { throw ise; } - if (stamp.getClientId() == null || stamp.getClientId().equals(proposedRevision.getClientId())) { - // TODO - Must make sure that we don't have an expired stamp if it is the result of another - // operation taking a long time. I.e., Client A fires off two requests for Component X. If the - // first one takes 2 minutes to complete, it should not result in the second request getting - // rejected. I.e., we want to ensure that if the request is received before the Claim expired, - // that we do not throw an ExpiredRevisionClaimException. Expiration of the Revision is intended - // only to avoid the case where a node obtains a Claim and then the node is lost or otherwise does - // not fulfill the second phase of the two-phase commit. - // We may need a Queue of updates (queue would need to be bounded, with a request getting - // rejected if queue is full). - if (stamp.isExpired()) { - throw new ExpiredRevisionClaimException("Claim for " + proposedRevision + " has expired"); - } + final boolean userEqual = stamp.getUser() == null || stamp.getUser().equals(user); + if (!userEqual) { + logger.debug("Failed to verify {} because the User was not the same as the Lock Stamp's User (Lock Stamp was {})", proposedRevision, stamp); + throw new InvalidRevisionException("Cannot obtain write lock for " + proposedRevision + " because it was claimed by " + stamp.getUser()); + } - // Intentionally leave the thread lock in a locked state! - releaseLock = false; - return true; - } else { + final boolean clientIdEqual = stamp.getClientId() == null || stamp.getClientId().equals(proposedRevision.getClientId()); + if (!clientIdEqual) { logger.debug("Failed to verify {} because the Client ID was not the same as the Lock Stamp's Client ID (Lock Stamp was {})", proposedRevision, stamp); + throw new InvalidRevisionException("Cannot obtain write lock for " + proposedRevision + " because it was claimed with a different Client ID"); + } + + // TODO - Must make sure that we don't have an expired stamp if it is the result of another + // operation taking a long time. I.e., Client A fires off two requests for Component X. If the + // first one takes 2 minutes to complete, it should not result in the second request getting + // rejected. I.e., we want to ensure that if the request is received before the Claim expired, + // that we do not throw an ExpiredRevisionClaimException. Expiration of the Revision is intended + // only to avoid the case where a node obtains a Claim and then the node is lost or otherwise does + // not fulfill the second phase of the two-phase commit. + // We may need a Queue of updates (queue would need to be bounded, with a request getting + // rejected if queue is full). + if (stamp.isExpired()) { + throw new ExpiredRevisionClaimException("Claim for " + proposedRevision + " has expired"); } + + // Intentionally leave the thread lock in a locked state! + releaseLock = false; + return true; } } finally { if (releaseLock) { @@ -453,13 +501,13 @@ public class NaiveRevisionManager implements RevisionManager { return false; } - private void acquireClaim(final String clientId) { + private void acquireClaim(final NiFiUser user, final String clientId) { while (true) { final LockStamp stamp = lockStamp.get(); if (stamp == null || stamp.isExpired()) { final long now = System.nanoTime(); - final boolean lockObtained = lockStamp.compareAndSet(stamp, new LockStamp(clientId, now + lockNanos)); + final boolean lockObtained = lockStamp.compareAndSet(stamp, new LockStamp(user, clientId, now + lockNanos)); if (lockObtained) { return; } @@ -469,7 +517,7 @@ public class NaiveRevisionManager implements RevisionManager { } } - public void acquireReadLock() { + public void acquireReadLock(final NiFiUser user, final String clientId) { // Wait until we can claim the lock stamp boolean obtained = false; while (!obtained) { @@ -477,7 +525,7 @@ public class NaiveRevisionManager implements RevisionManager { // write lock held. Wait until it is null and then replace it atomically // with a LockStamp that does not expire (expiration time is Long.MAX_VALUE). final LockStamp curStamp = lockStamp.get(); - obtained = (curStamp == null || curStamp.isExpired()) && lockStamp.compareAndSet(curStamp, new LockStamp(null, Long.MAX_VALUE)); + obtained = (curStamp == null || curStamp.isExpired()) && lockStamp.compareAndSet(curStamp, new LockStamp(user, clientId, Long.MAX_VALUE)); if (!obtained) { // Could not obtain lock. Yield so that we don't sit @@ -499,9 +547,13 @@ public class NaiveRevisionManager implements RevisionManager { lockStamp.set(null); } - public boolean releaseClaimIfCurrentThread() { + public boolean releaseClaimIfCurrentThread(final Revision revision) { threadLock.writeLock().lock(); try { + if (revision != null && !getRevision().equals(revision)) { + throw new InvalidRevisionException("Cannot release claim because the provided Revision is not valid"); + } + final LockStamp stamp = lockStamp.get(); if (stamp == null) { logger.debug("Cannot cancel claim for {} because there is no claim held", getRevision()); @@ -528,12 +580,17 @@ public class NaiveRevisionManager implements RevisionManager { * @param proposedRevision the proposed revision to check against the current revision * @return <code>true</code> if the Revision Claim was relinquished, <code>false</code> otherwise */ - public boolean relinquishRevisionClaim(final Revision proposedRevision) { + public boolean relinquishRevisionClaim(final Revision proposedRevision, final NiFiUser user) { threadLock.writeLock().lock(); try { - if (getRevision().equals(proposedRevision)) { - releaseClaim(); - return true; + final LockStamp stamp = lockStamp.get(); + if (stamp == null || stamp.getUser().equals(user)) { + if (getRevision().equals(proposedRevision)) { + releaseClaim(); + return true; + } + } else { + throw new InvalidRevisionException("Cannot relinquish claim for " + proposedRevision + " because it was claimed by " + stamp.getUser()); } return false; @@ -581,8 +638,18 @@ public class NaiveRevisionManager implements RevisionManager { */ public void renewExpiration(final long timestamp) { final LockStamp stamp = lockStamp.get(); - final String clientId = stamp == null ? null : stamp.getClientId(); - lockStamp.set(new LockStamp(clientId, timestamp)); + + final NiFiUser user; + final String clientId; + if (stamp == null) { + user = null; + clientId = null; + } else { + user = stamp.getUser(); + clientId = stamp.getClientId(); + } + + lockStamp.set(new LockStamp(user, clientId, timestamp)); } public Revision getRevision() { @@ -593,16 +660,22 @@ public class NaiveRevisionManager implements RevisionManager { private static class LockStamp { + private final NiFiUser user; private final String clientId; private final long expirationTimestamp; private final Thread obtainingThread; - public LockStamp(final String clientId, final long expirationTimestamp) { + public LockStamp(final NiFiUser user, final String clientId, final long expirationTimestamp) { + this.user = user; this.clientId = clientId; this.expirationTimestamp = expirationTimestamp; this.obtainingThread = Thread.currentThread(); } + public NiFiUser getUser() { + return user; + } + public String getClientId() { return clientId; } @@ -617,7 +690,7 @@ public class NaiveRevisionManager implements RevisionManager { @Override public String toString() { - return clientId; + return "LockStamp[user=" + user + ", clientId=" + clientId + "]"; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionManager.java index c9b750f..ae503a4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionManager.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Set; import java.util.function.Supplier; +import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.web.InvalidRevisionException; import org.apache.nifi.web.Revision; @@ -82,12 +83,13 @@ public interface RevisionManager { * * @param revisions a Set of Revisions, each of which corresponds to a different * component for which a Claim is to be acquired. + * @param user the user for which the claim is being requested * * @return the Revision Claim that was granted, if one was granted. * * @throws InvalidRevisionException if any of the Revisions provided is out-of-date. */ - RevisionClaim requestClaim(Collection<Revision> revisions) throws InvalidRevisionException; + RevisionClaim requestClaim(Collection<Revision> revisions, NiFiUser user) throws InvalidRevisionException; /** * <p> @@ -96,12 +98,13 @@ public interface RevisionManager { * </p> * * @param revision the revision to request a claim for + * @param user the user for which the claim is being requested * * @return the Revision Claim that was granted, if one was granted. * * @throws InvalidRevisionException if any of the Revisions provided is out-of-date. */ - RevisionClaim requestClaim(Revision revision) throws InvalidRevisionException; + RevisionClaim requestClaim(Revision revision, NiFiUser user) throws InvalidRevisionException; /** * Returns the current Revision for the component with the given ID. If no Revision yet exists for the @@ -121,7 +124,7 @@ public interface RevisionManager { * * @param claim the Revision Claim that is responsible for holding a Claim on the Revisions for each component that is * to be updated - * @param modifier the name of the entity that is modifying the resource + * @param modifier the user that is modifying the resource * @param task the task that is responsible for updating the components whose Revisions are claimed by the given * RevisionClaim. The returned Revision set should include a Revision for each Revision that is the * supplied Revision Claim. If there exists any Revision in the provided RevisionClaim that is not part @@ -131,7 +134,7 @@ public interface RevisionManager { * * @throws ExpiredRevisionClaimException if the Revision Claim has expired */ - <T> RevisionUpdate<T> updateRevision(RevisionClaim claim, String modifier, UpdateRevisionTask<T> task) throws ExpiredRevisionClaimException; + <T> RevisionUpdate<T> updateRevision(RevisionClaim claim, NiFiUser modifier, UpdateRevisionTask<T> task) throws ExpiredRevisionClaimException; /** * Performs the given task that is expected to remove a component from the flow. As a result, @@ -139,12 +142,13 @@ public interface RevisionManager { * * @param claim the Revision Claim that is responsible for holding a Claim on the Revisions for each component that is * to be removed + * @param user the user that is requesting that the revision be deleted * @param task the task that is responsible for deleting the components whose Revisions are claimed by the given RevisionClaim * @return the value returned from the DeleteRevisionTask * * @throws ExpiredRevisionClaimException if the Revision Claim has expired */ - <T> T deleteRevision(RevisionClaim claim, DeleteRevisionTask<T> task) throws ExpiredRevisionClaimException; + <T> T deleteRevision(RevisionClaim claim, NiFiUser user, DeleteRevisionTask<T> task) throws ExpiredRevisionClaimException; /** * Performs some operation to obtain an object of type T whose identifier is provided via @@ -173,11 +177,12 @@ public interface RevisionManager { * are up-to-date. * * @param claim the claim that holds the revisions + * @param user the user that is releasing the claim. Must be the same user that claimed the revision. * * @return <code>true</code> if the claim was released, <code>false</code> if the Revisions were not * up-to-date */ - boolean releaseClaim(RevisionClaim claim); + boolean releaseClaim(RevisionClaim claim, NiFiUser user); /** * Releases the claim on the revision for the given component if the claim was obtained by the calling thread @@ -186,4 +191,20 @@ public interface RevisionManager { * @return <code>true</code> if the claim was released, false otherwise */ boolean cancelClaim(String componentId); + + /** + * Releases the claim on the given revision if the claim was obtained by the calling thread + * + * @param revision the Revision to cancel the claim for + * @return <code>true</code> if the claim was released, false otherwise + */ + boolean cancelClaim(Revision revision); + + /** + * Releases the claims on the given revisions if the claim was obtained by the calling thread + * + * @param revisions the Revisions to cancel claims for + * @return <code>true</code> if all claims were released, false otherwise + */ + boolean cancelClaims(Set<Revision> revisions); } http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/revision/TestNaiveRevisionManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/revision/TestNaiveRevisionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/revision/TestNaiveRevisionManager.java index 1b43131..ad11884 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/revision/TestNaiveRevisionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/revision/TestNaiveRevisionManager.java @@ -36,6 +36,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.web.FlowModification; import org.apache.nifi.web.InvalidRevisionException; import org.apache.nifi.web.Revision; @@ -45,6 +46,7 @@ import org.junit.Test; public class TestNaiveRevisionManager { private static final String CLIENT_1 = "client-1"; private static final String COMPONENT_1 = "component-1"; + private static final NiFiUser USER_1 = new NiFiUser("user-1"); private RevisionUpdate<Object> components(final Revision revision) { return new StandardRevisionUpdate<Object>(null, new FlowModification(revision, null)); @@ -70,10 +72,10 @@ public class TestNaiveRevisionManager { public void testTypicalFlow() throws ExpiredRevisionClaimException { final RevisionManager revisionManager = new NaiveRevisionManager(); final Revision originalRevision = new Revision(0L, CLIENT_1, COMPONENT_1); - final RevisionClaim claim = revisionManager.requestClaim(originalRevision); + final RevisionClaim claim = revisionManager.requestClaim(originalRevision, USER_1); assertNotNull(claim); - revisionManager.updateRevision(claim, "unit test", () -> components(new Revision(1L, CLIENT_1, COMPONENT_1))); + revisionManager.updateRevision(claim, USER_1, () -> components(new Revision(1L, CLIENT_1, COMPONENT_1))); final Revision updatedRevision = revisionManager.getRevision(originalRevision.getComponentId()); assertNotNull(updatedRevision); @@ -86,13 +88,13 @@ public class TestNaiveRevisionManager { public void testExpiration() throws InterruptedException { final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MILLISECONDS); final Revision originalRevision = new Revision(0L, CLIENT_1, COMPONENT_1); - final RevisionClaim claim = revisionManager.requestClaim(originalRevision); + final RevisionClaim claim = revisionManager.requestClaim(originalRevision, USER_1); assertNotNull(claim); Thread.sleep(100); try { - revisionManager.updateRevision(claim, "unit test", () -> components(originalRevision, claim.getRevisions())); + revisionManager.updateRevision(claim, USER_1, () -> components(originalRevision, claim.getRevisions())); Assert.fail("Expected Revision Claim to have expired but it did not"); } catch (final ExpiredRevisionClaimException erce) { // expected @@ -103,12 +105,12 @@ public class TestNaiveRevisionManager { public void testConflictingClaimsFromDifferentClients() { final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.SECONDS); final Revision originalRevision = new Revision(0L, CLIENT_1, COMPONENT_1); - final RevisionClaim claim = revisionManager.requestClaim(originalRevision); + final RevisionClaim claim = revisionManager.requestClaim(originalRevision, USER_1); assertNotNull(claim); final Revision differentClientRevision = new Revision(0L, "client-2", COMPONENT_1); final long start = System.nanoTime(); - final RevisionClaim differentClientClaim = revisionManager.requestClaim(differentClientRevision); + final RevisionClaim differentClientClaim = revisionManager.requestClaim(differentClientRevision, USER_1); final long nanos = System.nanoTime() - start; // we should block for 2 seconds. But the timing won't necessarily be exact, @@ -139,7 +141,7 @@ public class TestNaiveRevisionManager { public void testGetWithReadLockAndContentionWithTimeout() { final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.SECONDS); final Revision originalRevision = new Revision(8L, CLIENT_1, COMPONENT_1); - final RevisionClaim claim = revisionManager.requestClaim(originalRevision); + final RevisionClaim claim = revisionManager.requestClaim(originalRevision, USER_1); assertNotNull(claim); final long start = System.nanoTime(); @@ -156,7 +158,7 @@ public class TestNaiveRevisionManager { public void testGetWithReadLockAndContentionWithEventualLockResolution() { final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MINUTES); final Revision originalRevision = new Revision(8L, CLIENT_1, COMPONENT_1); - final RevisionClaim claim = revisionManager.requestClaim(originalRevision); + final RevisionClaim claim = revisionManager.requestClaim(originalRevision, USER_1); assertNotNull(claim); final Revision updatedRevision = new Revision(100L, CLIENT_1, COMPONENT_1); @@ -166,7 +168,7 @@ public class TestNaiveRevisionManager { @Override public void run() { try { - revisionManager.updateRevision(claim, "unit test", () -> { + revisionManager.updateRevision(claim, USER_1, () -> { // Wait 2 seconds and then return try { Thread.sleep(2000L); @@ -199,21 +201,21 @@ public class TestNaiveRevisionManager { public void testDeleteRevision() { final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES); final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1); - final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision); + final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision, USER_1); assertNotNull(firstClaim); final Revision secondRevision = new Revision(2L, CLIENT_1, COMPONENT_1); final FlowModification mod = new FlowModification(secondRevision, "unit test"); - revisionManager.updateRevision(firstClaim, "unit test", () -> new StandardRevisionUpdate<Void>(null, mod, null)); + revisionManager.updateRevision(firstClaim, USER_1, () -> new StandardRevisionUpdate<Void>(null, mod, null)); final Revision updatedRevision = revisionManager.getRevision(COMPONENT_1); assertEquals(secondRevision, updatedRevision); - final RevisionClaim secondClaim = revisionManager.requestClaim(updatedRevision); + final RevisionClaim secondClaim = revisionManager.requestClaim(updatedRevision, USER_1); assertNotNull(secondClaim); final Object obj = new Object(); - final Object ret = revisionManager.deleteRevision(secondClaim, () -> obj); + final Object ret = revisionManager.deleteRevision(secondClaim, USER_1, () -> obj); assertEquals(obj, ret); final Revision curRevision = revisionManager.getRevision(COMPONENT_1); @@ -228,11 +230,11 @@ public class TestNaiveRevisionManager { public void testSameClientDifferentRevisionsDoNotBlockEachOther() { final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES); final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1); - final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision); + final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision, USER_1); assertNotNull(firstClaim); final Revision secondRevision = new Revision(1L, CLIENT_1, "component-2"); - final RevisionClaim secondClaim = revisionManager.requestClaim(secondRevision); + final RevisionClaim secondClaim = revisionManager.requestClaim(secondRevision, USER_1); assertNotNull(secondClaim); } @@ -240,14 +242,14 @@ public class TestNaiveRevisionManager { public void testSameClientSameRevisionBlocks() throws InterruptedException, ExecutionException { final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES); final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1); - final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision); + final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision, USER_1); assertNotNull(firstClaim); final Revision secondRevision = new Revision(1L, CLIENT_1, COMPONENT_1); final Runnable runnable = new Runnable() { @Override public void run() { - revisionManager.requestClaim(secondRevision); + revisionManager.requestClaim(secondRevision, USER_1); } }; final ExecutorService exec = Executors.newFixedThreadPool(1); @@ -265,14 +267,83 @@ public class TestNaiveRevisionManager { public void testDifferentClientDifferentRevisionsDoNotBlockEachOther() { final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES); final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1); - final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision); + final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision, USER_1); assertNotNull(firstClaim); final Revision secondRevision = new Revision(1L, "client-2", "component-2"); - final RevisionClaim secondClaim = revisionManager.requestClaim(secondRevision); + final RevisionClaim secondClaim = revisionManager.requestClaim(secondRevision, USER_1); assertNotNull(secondClaim); } + + @Test + public void testDifferentUserCannotClaimWriteLock() { + final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES); + final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1); + final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision, USER_1); + assertNotNull(firstClaim); + + final NiFiUser user2 = new NiFiUser("user-2"); + try { + revisionManager.updateRevision(firstClaim, user2, () -> null); + Assert.fail("Expected updateRevision to fail with a different user but it succeeded"); + } catch (final InvalidRevisionException ire) { + // Expected behavior + } + } + + @Test + public void testDifferentUserCannotDeleteRevision() { + final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES); + final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1); + final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision, USER_1); + assertNotNull(firstClaim); + + final NiFiUser user2 = new NiFiUser("user-2"); + try { + revisionManager.deleteRevision(firstClaim, user2, () -> null); + Assert.fail("Expected deleteRevision to fail with a different user but it succeeded"); + } catch (final InvalidRevisionException ire) { + // Expected behavior + } + } + + @Test + public void testSameUserDifferentClientIdCannotDeleteRevision() { + final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES); + final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1); + final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision, USER_1); + assertNotNull(firstClaim); + + final Revision differentClientId = new Revision(1L, "client-2", COMPONENT_1); + final RevisionClaim differentClaimIdClaim = new StandardRevisionClaim(differentClientId); + + try { + revisionManager.deleteRevision(differentClaimIdClaim, USER_1, () -> null); + Assert.fail("Expected deleteRevision to fail with a different user but it succeeded"); + } catch (final InvalidRevisionException ire) { + // Expected behavior + } + } + + @Test + public void testSameUserDifferentClientIdCannotClaimWriteLock() { + final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES); + final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1); + final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision, USER_1); + assertNotNull(firstClaim); + + final Revision differentClientId = new Revision(1L, "client-2", COMPONENT_1); + final RevisionClaim differentClaimIdClaim = new StandardRevisionClaim(differentClientId); + + try { + revisionManager.updateRevision(differentClaimIdClaim, USER_1, () -> null); + Assert.fail("Expected deleteRevision to fail with a different user but it succeeded"); + } catch (final InvalidRevisionException ire) { + // Expected behavior + } + } + @Test(timeout = 10000) public void testDifferentOrderedRevisionsDoNotCauseDeadlock() throws ExpiredRevisionClaimException, InterruptedException { // Because we block before obtaining a claim on a revision if another client has the revision claimed, @@ -288,7 +359,7 @@ public class TestNaiveRevisionManager { final Revision revision3c = new Revision(3L, "client-3", "c"); final Revision revision3a = new Revision(3L, "client-3", "a"); - final RevisionClaim claim1 = revisionManager.requestClaim(Arrays.asList(revision1a, revision1b)); + final RevisionClaim claim1 = revisionManager.requestClaim(Arrays.asList(revision1a, revision1b), USER_1); assertNotNull(claim1); final AtomicBoolean claim2Obtained = new AtomicBoolean(false); @@ -299,13 +370,13 @@ public class TestNaiveRevisionManager { new Thread(new Runnable() { @Override public void run() { - final RevisionClaim claim2 = revisionManager.requestClaim(Arrays.asList(revision2b, revision2c)); + final RevisionClaim claim2 = revisionManager.requestClaim(Arrays.asList(revision2b, revision2c), USER_1); assertNotNull(claim2); claim2Obtained.set(true); claim2Ref.set(claim2); try { - revisionManager.updateRevision(claim2, "unit test", () -> components(new Revision(3L, "client-2", "b"), new Revision(3L, "client-2", "c"))); + revisionManager.updateRevision(claim2, USER_1, () -> components(new Revision(3L, "client-2", "b"), new Revision(3L, "client-2", "c"))); } catch (ExpiredRevisionClaimException e) { Assert.fail("Revision unexpected expired"); } @@ -315,13 +386,13 @@ public class TestNaiveRevisionManager { new Thread(new Runnable() { @Override public void run() { - final RevisionClaim claim3 = revisionManager.requestClaim(Arrays.asList(revision3c, revision3a)); + final RevisionClaim claim3 = revisionManager.requestClaim(Arrays.asList(revision3c, revision3a), USER_1); assertNotNull(claim3); claim3Obtained.set(true); claim3Ref.set(claim3); try { - revisionManager.updateRevision(claim3Ref.get(), "unit test", () -> components(new Revision(2L, "client-3", "c"), new Revision(2L, "client-3", "a"))); + revisionManager.updateRevision(claim3Ref.get(), USER_1, () -> components(new Revision(2L, "client-3", "c"), new Revision(2L, "client-3", "a"))); } catch (ExpiredRevisionClaimException e) { Assert.fail("Revision unexpected expired"); } @@ -332,7 +403,7 @@ public class TestNaiveRevisionManager { assertFalse(claim2Obtained.get()); assertFalse(claim3Obtained.get()); - revisionManager.updateRevision(claim1, "unit test", () -> components(new Revision(3L, "client-1", "a"), new Revision(2L, "client-1", "b"))); + revisionManager.updateRevision(claim1, USER_1, () -> components(new Revision(3L, "client-1", "a"), new Revision(2L, "client-1", "b"))); Thread.sleep(250L); assertTrue(claim2Obtained.get() && claim3Obtained.get()); @@ -350,20 +421,20 @@ public class TestNaiveRevisionManager { public void testReleaseClaim() { final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MINUTES); final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1); - final RevisionClaim claim = revisionManager.requestClaim(firstRevision); + final RevisionClaim claim = revisionManager.requestClaim(firstRevision, USER_1); assertNotNull(claim); final RevisionClaim invalidClaim = new StandardRevisionClaim(new Revision(2L, "client-2", COMPONENT_1)); - assertFalse(revisionManager.releaseClaim(invalidClaim)); + assertFalse(revisionManager.releaseClaim(invalidClaim, USER_1)); - assertTrue(revisionManager.releaseClaim(claim)); + assertTrue(revisionManager.releaseClaim(claim, USER_1)); } @Test(timeout = 10000) public void testCancelClaimSameThread() { final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MINUTES); final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1); - final RevisionClaim claim = revisionManager.requestClaim(firstRevision); + final RevisionClaim claim = revisionManager.requestClaim(firstRevision, USER_1); assertNotNull(claim); assertFalse(revisionManager.cancelClaim("component-2")); @@ -374,7 +445,7 @@ public class TestNaiveRevisionManager { public void testCancelClaimDifferentThread() throws InterruptedException { final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MINUTES); final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1); - final RevisionClaim claim = revisionManager.requestClaim(firstRevision); + final RevisionClaim claim = revisionManager.requestClaim(firstRevision, USER_1); assertNotNull(claim); final Thread t = new Thread(new Runnable() { @@ -396,12 +467,12 @@ public class TestNaiveRevisionManager { final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MINUTES); final Revision component1V1 = new Revision(1L, CLIENT_1, COMPONENT_1); final Revision component2V1 = new Revision(1L, CLIENT_1, "component-2"); - final RevisionClaim claim = revisionManager.requestClaim(Arrays.asList(component1V1, component2V1)); + final RevisionClaim claim = revisionManager.requestClaim(Arrays.asList(component1V1, component2V1), USER_1); assertNotNull(claim); // Perform update but only update the revision for component-2 final Revision component1V2 = new Revision(2L, "client-2", COMPONENT_1); - revisionManager.updateRevision(claim, "unit test", new UpdateRevisionTask<Void>() { + revisionManager.updateRevision(claim, USER_1, new UpdateRevisionTask<Void>() { @Override public RevisionUpdate<Void> update() { return new StandardRevisionUpdate<>(null, new FlowModification(component1V2, "unit test")); @@ -410,14 +481,14 @@ public class TestNaiveRevisionManager { // Obtain a claim with correct revisions final Revision component2V2 = new Revision(2L, "client-2", "component-2"); - revisionManager.requestClaim(Arrays.asList(component1V2, component2V1)); + revisionManager.requestClaim(Arrays.asList(component1V2, component2V1), USER_1); // Attempt to update with incorrect revision for second component final RevisionClaim wrongClaim = new StandardRevisionClaim(component1V2, component2V2); final Revision component1V3 = new Revision(3L, CLIENT_1, COMPONENT_1); try { - revisionManager.updateRevision(wrongClaim, "unit test", + revisionManager.updateRevision(wrongClaim, USER_1, () -> new StandardRevisionUpdate<>(null, new FlowModification(component1V3, "unit test"), Collections.emptySet())); Assert.fail("Expected an Invalid Revision Exception"); } catch (final InvalidRevisionException ire) { @@ -425,14 +496,14 @@ public class TestNaiveRevisionManager { } // release claim should fail because we are passing the wrong revision for component 2 - assertFalse(revisionManager.releaseClaim(new StandardRevisionClaim(component1V2, component2V2))); + assertFalse(revisionManager.releaseClaim(new StandardRevisionClaim(component1V2, component2V2), USER_1)); // release claim should succeed because we are now using the proper revisions - assertTrue(revisionManager.releaseClaim(new StandardRevisionClaim(component1V2, component2V1))); + assertTrue(revisionManager.releaseClaim(new StandardRevisionClaim(component1V2, component2V1), USER_1)); // verify that we can update again. - final RevisionClaim thirdClaim = revisionManager.requestClaim(Arrays.asList(component1V2, component2V1)); + final RevisionClaim thirdClaim = revisionManager.requestClaim(Arrays.asList(component1V2, component2V1), USER_1); assertNotNull(thirdClaim); - revisionManager.updateRevision(thirdClaim, "unit test", () -> new StandardRevisionUpdate<>(null, new FlowModification(component1V3, "unit test"))); + revisionManager.updateRevision(thirdClaim, USER_1, () -> new StandardRevisionUpdate<>(null, new FlowModification(component1V3, "unit test"))); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/pom.xml index 6e79f7b..289b763 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/pom.xml @@ -31,7 +31,6 @@ <module>nifi-site-to-site</module> <module>nifi-framework-core</module> <module>nifi-framework-cluster-protocol</module> - <module>nifi-framework-cluster-web</module> <module>nifi-framework-cluster</module> <module>nifi-user-actions</module> <module>nifi-framework-authorization</module> http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/pom.xml index c1bdb08..27f7bd0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/pom.xml @@ -35,11 +35,6 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> - <artifactId>nifi-framework-cluster-web</artifactId> - <version>1.0.0-SNAPSHOT</version> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> <artifactId>nifi-framework-cluster</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency>