This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 986322dc361 MINOR: Moving the rollback out of lock in share partition 
(#20153)
986322dc361 is described below

commit 986322dc3619fc1725012ac33eebed7db8d8cbbb
Author: Apoorv Mittal <[email protected]>
AuthorDate: Fri Jul 11 15:22:03 2025 +0100

    MINOR: Moving the rollback out of lock in share partition (#20153)
    
    Moving rollback out of lock, if persister returns a completed future for
    write state then same data-plane-request-handler thread should not call
    purgatory safeTryAndComplete while holding SharePartition's write lock.
    
    Reviewers: Andrew Schofield <[email protected]>, Abhinav Dixit
     <[email protected]>
---
 core/src/main/java/kafka/server/share/SharePartition.java | 15 ++++++---------
 1 file changed, 6 insertions(+), 9 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 8c4ac930d18..b063e090ae0 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -958,14 +958,12 @@ public class SharePartition {
                     break;
                 }
             }
-
-            // If the acknowledgement is successful then persist state, 
complete the state transition
-            // and update the cached state for start offset. Else rollback the 
state transition.
-            rollbackOrProcessStateUpdates(future, throwable, updatedStates, 
stateBatches);
         } finally {
             lock.writeLock().unlock();
         }
-
+        // If the acknowledgement is successful then persist state, complete 
the state transition
+        // and update the cached state for start offset. Else rollback the 
state transition.
+        rollbackOrProcessStateUpdates(future, throwable, updatedStates, 
stateBatches);
         return future;
     }
 
@@ -1014,13 +1012,12 @@ public class SharePartition {
                     break;
                 }
             }
-
-            // If the release acquired records is successful then persist 
state, complete the state transition
-            // and update the cached state for start offset. Else rollback the 
state transition.
-            rollbackOrProcessStateUpdates(future, throwable, updatedStates, 
stateBatches);
         } finally {
             lock.writeLock().unlock();
         }
+        // If the release acquired records is successful then persist state, 
complete the state transition
+        // and update the cached state for start offset. Else rollback the 
state transition.
+        rollbackOrProcessStateUpdates(future, throwable, updatedStates, 
stateBatches);
         return future;
     }
 

Reply via email to