apoorvmittal10 commented on code in PR #17957:
URL: https://github.com/apache/kafka/pull/17957#discussion_r1863416832
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -389,79 +389,84 @@ public CompletableFuture<Void> maybeInitialize() {
.build())
.build()
).whenComplete((result, exception) -> {
- if (exception != null) {
- log.error("Failed to initialize the share partition:
{}-{}", groupId, topicIdPartition, exception);
- completeInitializationWithException(future, exception);
- return;
- }
+ lock.writeLock().lock();
Review Comment:
Why not to release the lock after setting `partitionState =
SharePartitionState.INITIALIZING;` and re-acquire later?
The method has double-checked lock where method
`maybeCompleteInitialization(future)` is checked again to reduce lock overhead
as the `EMPTY` state should be temporary at bootstrap. We should release the
lock post setting the state and re-acquire when persister sends the response
back. It shall be clean that way.
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1647,24 +1652,29 @@ void rollbackOrProcessStateUpdates(
}
writeShareGroupState(stateBatches).whenComplete((result,
exception) -> {
- if (exception != null) {
- log.error("Failed to write state to persister for the
share partition: {}-{}",
- groupId, topicIdPartition, exception);
- updatedStates.forEach(state ->
state.completeStateTransition(false));
- future.completeExceptionally(exception);
- return;
- }
+ lock.writeLock().lock();
+ try {
+ if (exception != null) {
+ log.error("Failed to write state to persister for the
share partition: {}-{}",
+ groupId, topicIdPartition, exception);
+ updatedStates.forEach(state ->
state.completeStateTransition(false));
+ future.completeExceptionally(exception);
+ return;
+ }
- log.trace("State change request successful for share
partition: {}-{}",
- groupId, topicIdPartition);
- updatedStates.forEach(state -> {
- state.completeStateTransition(true);
- // Cancel the acquisition lock timeout task for the state
since it is acknowledged/released successfully.
- state.cancelAndClearAcquisitionLockTimeoutTask();
- });
- // Update the cached state and start and end offsets after
acknowledging/releasing the acquired records.
- maybeUpdateCachedStateAndOffsets();
- future.complete(null);
+ log.trace("State change request successful for share
partition: {}-{}",
+ groupId, topicIdPartition);
+ updatedStates.forEach(state -> {
+ state.completeStateTransition(true);
+ // Cancel the acquisition lock timeout task for the
state since it is acknowledged/released successfully.
+ state.cancelAndClearAcquisitionLockTimeoutTask();
+ });
+ // Update the cached state and start and end offsets after
acknowledging/releasing the acquired records.
+ maybeUpdateCachedStateAndOffsets();
+ future.complete(null);
+ } finally {
+ lock.writeLock().unlock();
+ }
});
} finally {
lock.writeLock().unlock();
Review Comment:
Again I would say to decouple the 2 locks and release the first one just
after `updatedStates.forEach(state -> state.completeStateTransition(false));`
as that only needs to be in lock.
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -389,79 +389,84 @@ public CompletableFuture<Void> maybeInitialize() {
.build())
.build()
).whenComplete((result, exception) -> {
- if (exception != null) {
- log.error("Failed to initialize the share partition:
{}-{}", groupId, topicIdPartition, exception);
- completeInitializationWithException(future, exception);
- return;
- }
+ lock.writeLock().lock();
+ try {
+ if (exception != null) {
+ log.error("Failed to initialize the share partition:
{}-{}", groupId, topicIdPartition, exception);
+ completeInitializationWithException(future, exception);
+ return;
+ }
- if (result == null || result.topicsData() == null ||
result.topicsData().size() != 1) {
- log.error("Failed to initialize the share partition:
{}-{}. Invalid state found: {}.",
- groupId, topicIdPartition, result);
- completeInitializationWithException(future, new
IllegalStateException(String.format("Failed to initialize the share partition
%s-%s", groupId, topicIdPartition)));
- return;
- }
+ if (result == null || result.topicsData() == null ||
result.topicsData().size() != 1) {
+ log.error("Failed to initialize the share partition:
{}-{}. Invalid state found: {}.",
+ groupId, topicIdPartition, result);
+ completeInitializationWithException(future, new
IllegalStateException(String.format("Failed to initialize the share partition
%s-%s", groupId, topicIdPartition)));
+ return;
+ }
- TopicData<PartitionAllData> state = result.topicsData().get(0);
- if (state.topicId() != topicIdPartition.topicId() ||
state.partitions().size() != 1) {
- log.error("Failed to initialize the share partition:
{}-{}. Invalid topic partition response: {}.",
- groupId, topicIdPartition, result);
- completeInitializationWithException(future, new
IllegalStateException(String.format("Failed to initialize the share partition
%s-%s", groupId, topicIdPartition)));
- return;
- }
+ TopicData<PartitionAllData> state =
result.topicsData().get(0);
+ if (state.topicId() != topicIdPartition.topicId() ||
state.partitions().size() != 1) {
+ log.error("Failed to initialize the share partition:
{}-{}. Invalid topic partition response: {}.",
+ groupId, topicIdPartition, result);
+ completeInitializationWithException(future, new
IllegalStateException(String.format("Failed to initialize the share partition
%s-%s", groupId, topicIdPartition)));
+ return;
+ }
- PartitionAllData partitionData = state.partitions().get(0);
- if (partitionData.partition() != topicIdPartition.partition())
{
- log.error("Failed to initialize the share partition:
{}-{}. Invalid partition response: {}.",
- groupId, topicIdPartition, partitionData);
- completeInitializationWithException(future, new
IllegalStateException(String.format("Failed to initialize the share partition
%s-%s", groupId, topicIdPartition)));
- return;
- }
+ PartitionAllData partitionData = state.partitions().get(0);
+ if (partitionData.partition() !=
topicIdPartition.partition()) {
+ log.error("Failed to initialize the share partition:
{}-{}. Invalid partition response: {}.",
+ groupId, topicIdPartition, partitionData);
+ completeInitializationWithException(future, new
IllegalStateException(String.format("Failed to initialize the share partition
%s-%s", groupId, topicIdPartition)));
+ return;
+ }
- if (partitionData.errorCode() != Errors.NONE.code()) {
- KafkaException ex =
fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage());
- log.error("Failed to initialize the share partition:
{}-{}. Exception occurred: {}.",
- groupId, topicIdPartition, partitionData);
- completeInitializationWithException(future, ex);
- return;
- }
+ if (partitionData.errorCode() != Errors.NONE.code()) {
+ KafkaException ex =
fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage());
+ log.error("Failed to initialize the share partition:
{}-{}. Exception occurred: {}.",
+ groupId, topicIdPartition, partitionData);
+ completeInitializationWithException(future, ex);
+ return;
+ }
- try {
- startOffset =
startOffsetDuringInitialization(partitionData.startOffset());
- } catch (Exception e) {
- completeInitializationWithException(future, e);
- return;
- }
- stateEpoch = partitionData.stateEpoch();
-
- List<PersisterStateBatch> stateBatches =
partitionData.stateBatches();
- for (PersisterStateBatch stateBatch : stateBatches) {
- if (stateBatch.firstOffset() < startOffset) {
- log.error("Invalid state batch found for the share
partition: {}-{}. The base offset: {}"
- + " is less than the start offset: {}.",
groupId, topicIdPartition,
- stateBatch.firstOffset(), startOffset);
- completeInitializationWithException(future, new
IllegalStateException(String.format("Failed to initialize the share partition
%s-%s", groupId, topicIdPartition)));
+ try {
+ startOffset =
startOffsetDuringInitialization(partitionData.startOffset());
+ } catch (Exception e) {
+ completeInitializationWithException(future, e);
return;
}
- InFlightBatch inFlightBatch = new
InFlightBatch(EMPTY_MEMBER_ID, stateBatch.firstOffset(),
- stateBatch.lastOffset(),
RecordState.forId(stateBatch.deliveryState()), stateBatch.deliveryCount(),
null);
- cachedState.put(stateBatch.firstOffset(), inFlightBatch);
- }
- // Update the endOffset of the partition.
- if (!cachedState.isEmpty()) {
- // If the cachedState is not empty, findNextFetchOffset
flag is set to true so that any AVAILABLE records
- // in the cached state are not missed
- findNextFetchOffset.set(true);
- endOffset =
cachedState.lastEntry().getValue().lastOffset();
- // In case the persister read state RPC result contains no
AVAILABLE records, we can update cached state
- // and start/end offsets.
- maybeUpdateCachedStateAndOffsets();
- } else {
- endOffset = startOffset;
+ stateEpoch = partitionData.stateEpoch();
+
+ List<PersisterStateBatch> stateBatches =
partitionData.stateBatches();
+ for (PersisterStateBatch stateBatch : stateBatches) {
+ if (stateBatch.firstOffset() < startOffset) {
+ log.error("Invalid state batch found for the share
partition: {}-{}. The base offset: {}"
+ + " is less than the start offset: {}.",
groupId, topicIdPartition,
+ stateBatch.firstOffset(), startOffset);
+ completeInitializationWithException(future, new
IllegalStateException(String.format("Failed to initialize the share partition
%s-%s", groupId, topicIdPartition)));
+ return;
+ }
+ InFlightBatch inFlightBatch = new
InFlightBatch(EMPTY_MEMBER_ID, stateBatch.firstOffset(),
+ stateBatch.lastOffset(),
RecordState.forId(stateBatch.deliveryState()), stateBatch.deliveryCount(),
null);
+ cachedState.put(stateBatch.firstOffset(),
inFlightBatch);
+ }
+ // Update the endOffset of the partition.
+ if (!cachedState.isEmpty()) {
+ // If the cachedState is not empty,
findNextFetchOffset flag is set to true so that any AVAILABLE records
+ // in the cached state are not missed
+ findNextFetchOffset.set(true);
+ endOffset =
cachedState.lastEntry().getValue().lastOffset();
+ // In case the persister read state RPC result
contains no AVAILABLE records, we can update cached state
+ // and start/end offsets.
+ maybeUpdateCachedStateAndOffsets();
+ } else {
+ endOffset = startOffset;
+ }
+ // Set the partition state to Active and complete the
future.
+ partitionState = SharePartitionState.ACTIVE;
Review Comment:
@adixitconfluent Just to be sure, there is no code change other that
indentation here, correct?
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -389,79 +389,84 @@ public CompletableFuture<Void> maybeInitialize() {
.build())
.build()
).whenComplete((result, exception) -> {
- if (exception != null) {
- log.error("Failed to initialize the share partition:
{}-{}", groupId, topicIdPartition, exception);
- completeInitializationWithException(future, exception);
- return;
- }
+ lock.writeLock().lock();
+ try {
+ if (exception != null) {
+ log.error("Failed to initialize the share partition:
{}-{}", groupId, topicIdPartition, exception);
+ completeInitializationWithException(future, exception);
+ return;
+ }
- if (result == null || result.topicsData() == null ||
result.topicsData().size() != 1) {
- log.error("Failed to initialize the share partition:
{}-{}. Invalid state found: {}.",
- groupId, topicIdPartition, result);
- completeInitializationWithException(future, new
IllegalStateException(String.format("Failed to initialize the share partition
%s-%s", groupId, topicIdPartition)));
- return;
- }
+ if (result == null || result.topicsData() == null ||
result.topicsData().size() != 1) {
+ log.error("Failed to initialize the share partition:
{}-{}. Invalid state found: {}.",
+ groupId, topicIdPartition, result);
+ completeInitializationWithException(future, new
IllegalStateException(String.format("Failed to initialize the share partition
%s-%s", groupId, topicIdPartition)));
+ return;
+ }
- TopicData<PartitionAllData> state = result.topicsData().get(0);
- if (state.topicId() != topicIdPartition.topicId() ||
state.partitions().size() != 1) {
- log.error("Failed to initialize the share partition:
{}-{}. Invalid topic partition response: {}.",
- groupId, topicIdPartition, result);
- completeInitializationWithException(future, new
IllegalStateException(String.format("Failed to initialize the share partition
%s-%s", groupId, topicIdPartition)));
- return;
- }
+ TopicData<PartitionAllData> state =
result.topicsData().get(0);
+ if (state.topicId() != topicIdPartition.topicId() ||
state.partitions().size() != 1) {
+ log.error("Failed to initialize the share partition:
{}-{}. Invalid topic partition response: {}.",
+ groupId, topicIdPartition, result);
+ completeInitializationWithException(future, new
IllegalStateException(String.format("Failed to initialize the share partition
%s-%s", groupId, topicIdPartition)));
+ return;
+ }
- PartitionAllData partitionData = state.partitions().get(0);
- if (partitionData.partition() != topicIdPartition.partition())
{
- log.error("Failed to initialize the share partition:
{}-{}. Invalid partition response: {}.",
- groupId, topicIdPartition, partitionData);
- completeInitializationWithException(future, new
IllegalStateException(String.format("Failed to initialize the share partition
%s-%s", groupId, topicIdPartition)));
- return;
- }
+ PartitionAllData partitionData = state.partitions().get(0);
+ if (partitionData.partition() !=
topicIdPartition.partition()) {
+ log.error("Failed to initialize the share partition:
{}-{}. Invalid partition response: {}.",
+ groupId, topicIdPartition, partitionData);
+ completeInitializationWithException(future, new
IllegalStateException(String.format("Failed to initialize the share partition
%s-%s", groupId, topicIdPartition)));
+ return;
+ }
- if (partitionData.errorCode() != Errors.NONE.code()) {
- KafkaException ex =
fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage());
- log.error("Failed to initialize the share partition:
{}-{}. Exception occurred: {}.",
- groupId, topicIdPartition, partitionData);
- completeInitializationWithException(future, ex);
- return;
- }
+ if (partitionData.errorCode() != Errors.NONE.code()) {
+ KafkaException ex =
fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage());
+ log.error("Failed to initialize the share partition:
{}-{}. Exception occurred: {}.",
+ groupId, topicIdPartition, partitionData);
+ completeInitializationWithException(future, ex);
+ return;
+ }
- try {
- startOffset =
startOffsetDuringInitialization(partitionData.startOffset());
- } catch (Exception e) {
- completeInitializationWithException(future, e);
- return;
- }
- stateEpoch = partitionData.stateEpoch();
-
- List<PersisterStateBatch> stateBatches =
partitionData.stateBatches();
- for (PersisterStateBatch stateBatch : stateBatches) {
- if (stateBatch.firstOffset() < startOffset) {
- log.error("Invalid state batch found for the share
partition: {}-{}. The base offset: {}"
- + " is less than the start offset: {}.",
groupId, topicIdPartition,
- stateBatch.firstOffset(), startOffset);
- completeInitializationWithException(future, new
IllegalStateException(String.format("Failed to initialize the share partition
%s-%s", groupId, topicIdPartition)));
+ try {
+ startOffset =
startOffsetDuringInitialization(partitionData.startOffset());
+ } catch (Exception e) {
+ completeInitializationWithException(future, e);
return;
}
- InFlightBatch inFlightBatch = new
InFlightBatch(EMPTY_MEMBER_ID, stateBatch.firstOffset(),
- stateBatch.lastOffset(),
RecordState.forId(stateBatch.deliveryState()), stateBatch.deliveryCount(),
null);
- cachedState.put(stateBatch.firstOffset(), inFlightBatch);
- }
- // Update the endOffset of the partition.
- if (!cachedState.isEmpty()) {
- // If the cachedState is not empty, findNextFetchOffset
flag is set to true so that any AVAILABLE records
- // in the cached state are not missed
- findNextFetchOffset.set(true);
- endOffset =
cachedState.lastEntry().getValue().lastOffset();
- // In case the persister read state RPC result contains no
AVAILABLE records, we can update cached state
- // and start/end offsets.
- maybeUpdateCachedStateAndOffsets();
- } else {
- endOffset = startOffset;
+ stateEpoch = partitionData.stateEpoch();
+
+ List<PersisterStateBatch> stateBatches =
partitionData.stateBatches();
+ for (PersisterStateBatch stateBatch : stateBatches) {
+ if (stateBatch.firstOffset() < startOffset) {
+ log.error("Invalid state batch found for the share
partition: {}-{}. The base offset: {}"
+ + " is less than the start offset: {}.",
groupId, topicIdPartition,
+ stateBatch.firstOffset(), startOffset);
+ completeInitializationWithException(future, new
IllegalStateException(String.format("Failed to initialize the share partition
%s-%s", groupId, topicIdPartition)));
+ return;
+ }
+ InFlightBatch inFlightBatch = new
InFlightBatch(EMPTY_MEMBER_ID, stateBatch.firstOffset(),
+ stateBatch.lastOffset(),
RecordState.forId(stateBatch.deliveryState()), stateBatch.deliveryCount(),
null);
+ cachedState.put(stateBatch.firstOffset(),
inFlightBatch);
+ }
+ // Update the endOffset of the partition.
+ if (!cachedState.isEmpty()) {
+ // If the cachedState is not empty,
findNextFetchOffset flag is set to true so that any AVAILABLE records
+ // in the cached state are not missed
+ findNextFetchOffset.set(true);
+ endOffset =
cachedState.lastEntry().getValue().lastOffset();
+ // In case the persister read state RPC result
contains no AVAILABLE records, we can update cached state
+ // and start/end offsets.
+ maybeUpdateCachedStateAndOffsets();
+ } else {
+ endOffset = startOffset;
+ }
+ // Set the partition state to Active and complete the
future.
+ partitionState = SharePartitionState.ACTIVE;
+ future.complete(null);
Review Comment:
> The registered action for the future could potentially try to acquire
another lock that is held by a different thread
Hmm, I don't see that should be a problem. There can anyways be multiple
threads waiting on the same lock in other methods as well. Am I missing
something here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]