turcsanyip commented on code in PR #11275:
URL: https://github.com/apache/nifi/pull/11275#discussion_r3302274755


##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java:
##########
@@ -570,7 +571,7 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
                 }
             }
 
-        } catch (final IOException e) {
+        } catch (final IOException | ConcurrentModificationException e) {
             session.rollback();
             throw new ProcessException(String.format("Unable to communicate 
with cache while updating %s due to %s", signalId, e), e);
         }

Review Comment:
   In the event of an optimistic lock failure, the `Wait` processor should 
retry the transaction silently rather than failing the FlowFile immediately. 
(similar to the `for` cycle and `MAX_REPLACE_RETRY_COUNT` in 
`WaitNotifyProtocol.notify()`).
   
   I see the `Wait` flow is more complicated and a simple `for` cycle around it 
may not be possible.
   
   Minimally, I would separate the exception handling:
   
   - the `Unable to communicate with cache` error message is not appropriate 
for the concurrent modification case
   - warning log may be enough instead of error
   
   The preferred way would definitely be the internal retry with max retry 
count.
   
   



##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java:
##########
@@ -281,22 +284,54 @@ public Signal getSignal(final String signalId) throws 
IOException, Deserializati
 
     /**
      * Finish protocol and remove the cache entry.
-     * @param signalId a key in the underlying cache engine
+     *
+     * <p>This method performs a best-effort version check before removing the 
entry. If the signal
+     * was concurrently modified by a Notify processor after the caller last 
read it, a
+     * {@link ConcurrentModificationException} is thrown so the caller can 
roll back and retry
+     * rather than silently discarding the concurrent notification.</p>
+     *
+     * <p>Note: there is a small inherent TOCTOU window between the version 
re-fetch and the
+     * remove call. A {@link AtomicDistributedMapCacheClient} API extension 
for atomic
+     * compare-and-delete would eliminate this entirely, but this approach 
covers the common case.</p>
+     *
+     * @param signal the Signal obtained from the most recent {@link 
#getSignal(String)} call;
+     *               its cached revision is used to detect concurrent 
modifications
      * @throws IOException thrown when it failed interacting with the cache 
engine
+     * @throws ConcurrentModificationException thrown if the signal was 
concurrently modified
+     *         since the caller last read it
      */
-    public void complete(final String signalId) throws IOException {
+    public void complete(final Signal signal) throws IOException, 
ConcurrentModificationException {
+        final String signalId = signal.identifier;
+
+        // Re-fetch to detect concurrent updates since the signal was last 
read.
+        final Signal current = getSignal(signalId);
+        if (current == null) {
+            return;

Review Comment:
   I think this case should also be treated as a concurrent modification and 
should throw an exception.
   
   Another process may have consumed and removed the signal after we initially 
read it, which means the value we based our decision on was no longer valid.
   
   Possible scenario:
   
   - signal counter = 1 initially
   - 2 concurrent threads read the signal
   - both decrease the counter to 0 and resume the waiting FlowFile
   - both removes the signal (as the counter reached 0)
   
   => only 1 FF should have been resumed based on the counter



##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java:
##########
@@ -281,22 +284,54 @@ public Signal getSignal(final String signalId) throws 
IOException, Deserializati
 
     /**
      * Finish protocol and remove the cache entry.
-     * @param signalId a key in the underlying cache engine
+     *
+     * <p>This method performs a best-effort version check before removing the 
entry. If the signal
+     * was concurrently modified by a Notify processor after the caller last 
read it, a
+     * {@link ConcurrentModificationException} is thrown so the caller can 
roll back and retry
+     * rather than silently discarding the concurrent notification.</p>
+     *
+     * <p>Note: there is a small inherent TOCTOU window between the version 
re-fetch and the
+     * remove call. A {@link AtomicDistributedMapCacheClient} API extension 
for atomic
+     * compare-and-delete would eliminate this entirely, but this approach 
covers the common case.</p>
+     *
+     * @param signal the Signal obtained from the most recent {@link 
#getSignal(String)} call;
+     *               its cached revision is used to detect concurrent 
modifications
      * @throws IOException thrown when it failed interacting with the cache 
engine
+     * @throws ConcurrentModificationException thrown if the signal was 
concurrently modified
+     *         since the caller last read it
      */
-    public void complete(final String signalId) throws IOException {
+    public void complete(final Signal signal) throws IOException, 
ConcurrentModificationException {
+        final String signalId = signal.identifier;
+
+        // Re-fetch to detect concurrent updates since the signal was last 
read.
+        final Signal current = getSignal(signalId);
+        if (current == null) {
+            return;
+        }
+
+        final Object expectedRevision = signal.cachedEntry != null ? 
signal.cachedEntry.getRevision().orElse(null) : null;
+        final Object actualRevision = 
current.cachedEntry.getRevision().orElse(null);
+        if (expectedRevision != null && 
!expectedRevision.equals(actualRevision)) {
+            throw new ConcurrentModificationException(String.format(
+                    "Failed to complete signal [%s]: signal was concurrently 
modified (expected revision %s, found %s). Will retry.",

Review Comment:
   It depends on the caller whether it retries the transaction or gives up, so 
I suggest removing the "Will retry." suffix.



##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java:
##########
@@ -281,22 +284,54 @@ public Signal getSignal(final String signalId) throws 
IOException, Deserializati
 
     /**
      * Finish protocol and remove the cache entry.
-     * @param signalId a key in the underlying cache engine
+     *
+     * <p>This method performs a best-effort version check before removing the 
entry. If the signal
+     * was concurrently modified by a Notify processor after the caller last 
read it, a
+     * {@link ConcurrentModificationException} is thrown so the caller can 
roll back and retry
+     * rather than silently discarding the concurrent notification.</p>
+     *
+     * <p>Note: there is a small inherent TOCTOU window between the version 
re-fetch and the
+     * remove call. A {@link AtomicDistributedMapCacheClient} API extension 
for atomic
+     * compare-and-delete would eliminate this entirely, but this approach 
covers the common case.</p>
+     *
+     * @param signal the Signal obtained from the most recent {@link 
#getSignal(String)} call;
+     *               its cached revision is used to detect concurrent 
modifications
      * @throws IOException thrown when it failed interacting with the cache 
engine
+     * @throws ConcurrentModificationException thrown if the signal was 
concurrently modified
+     *         since the caller last read it
      */
-    public void complete(final String signalId) throws IOException {
+    public void complete(final Signal signal) throws IOException, 
ConcurrentModificationException {
+        final String signalId = signal.identifier;
+
+        // Re-fetch to detect concurrent updates since the signal was last 
read.
+        final Signal current = getSignal(signalId);
+        if (current == null) {
+            return;
+        }
+
+        final Object expectedRevision = signal.cachedEntry != null ? 
signal.cachedEntry.getRevision().orElse(null) : null;
+        final Object actualRevision = 
current.cachedEntry.getRevision().orElse(null);
+        if (expectedRevision != null && 
!expectedRevision.equals(actualRevision)) {
+            throw new ConcurrentModificationException(String.format(
+                    "Failed to complete signal [%s]: signal was concurrently 
modified (expected revision %s, found %s). Will retry.",
+                    signalId, expectedRevision, actualRevision));
+        }
+
         cache.remove(signalId, stringSerializer);
     }
 
-    public boolean replace(final Signal signal) throws IOException {
+    public void replace(final Signal signal) throws IOException, 
ConcurrentModificationException {
 
         final String signalJson = objectMapper.writeValueAsString(signal);
         if (signal.cachedEntry == null) {
             signal.cachedEntry = new AtomicCacheEntry<>(signal.identifier, 
signalJson, null);
         } else {
             signal.cachedEntry.setValue(signalJson);
         }
-        return cache.replace(signal.cachedEntry, stringSerializer, 
stringSerializer);
+        if (!cache.replace(signal.cachedEntry, stringSerializer, 
stringSerializer)) {
+            throw new ConcurrentModificationException(String.format(
+                    "Failed to update signal [%s] in cache due to concurrent 
modification. Will retry.", signal.identifier));

Review Comment:
   Same as line 316.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to