This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 986322dc361 MINOR: Moving the rollback out of lock in share partition
(#20153)
986322dc361 is described below
commit 986322dc3619fc1725012ac33eebed7db8d8cbbb
Author: Apoorv Mittal <[email protected]>
AuthorDate: Fri Jul 11 15:22:03 2025 +0100
MINOR: Moving the rollback out of lock in share partition (#20153)
Moving rollback out of lock, if persister returns a completed future for
write state then same data-plane-request-handler thread should not call
purgatory safeTryAndComplete while holding SharePartition's write lock.
Reviewers: Andrew Schofield <[email protected]>, Abhinav Dixit
<[email protected]>
---
core/src/main/java/kafka/server/share/SharePartition.java | 15 ++++++---------
1 file changed, 6 insertions(+), 9 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 8c4ac930d18..b063e090ae0 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -958,14 +958,12 @@ public class SharePartition {
break;
}
}
-
- // If the acknowledgement is successful then persist state,
complete the state transition
- // and update the cached state for start offset. Else rollback the
state transition.
- rollbackOrProcessStateUpdates(future, throwable, updatedStates,
stateBatches);
} finally {
lock.writeLock().unlock();
}
-
+ // If the acknowledgement is successful then persist state, complete
the state transition
+ // and update the cached state for start offset. Else rollback the
state transition.
+ rollbackOrProcessStateUpdates(future, throwable, updatedStates,
stateBatches);
return future;
}
@@ -1014,13 +1012,12 @@ public class SharePartition {
break;
}
}
-
- // If the release acquired records is successful then persist
state, complete the state transition
- // and update the cached state for start offset. Else rollback the
state transition.
- rollbackOrProcessStateUpdates(future, throwable, updatedStates,
stateBatches);
} finally {
lock.writeLock().unlock();
}
+ // If the release acquired records is successful then persist state,
complete the state transition
+ // and update the cached state for start offset. Else rollback the
state transition.
+ rollbackOrProcessStateUpdates(future, throwable, updatedStates,
stateBatches);
return future;
}