This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 366b998 KAFKA-13777: Fix potential FetchResponse#responseData race
condition issue (#11963)
366b998 is described below
commit 366b998a229f26aa4601e6b114c2198de0697562
Author: yun-yun <[email protected]>
AuthorDate: Thu Mar 31 09:45:33 2022 +0800
KAFKA-13777: Fix potential FetchResponse#responseData race condition issue
(#11963)
In Fix FetchResponse#responseData, we did a double-checked lock for the
responseData, but the assignment of lazy-initialized object(responseData)
didn't assign in the last step, which would let other threads get the partial
object.
Reviewers: David Jacot <[email protected]>, Luke Chen <[email protected]>
---
.../main/java/org/apache/kafka/common/requests/FetchResponse.java | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 2e0a02e..0d7049d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -100,7 +100,10 @@ public class FetchResponse extends AbstractResponse {
if (responseData == null) {
synchronized (this) {
if (responseData == null) {
- responseData = new LinkedHashMap<>();
+ // Assigning the lazy-initialized responseData in the last
step
+ // to avoid other threads accessing a half-initialized
object.
+ final LinkedHashMap<TopicPartition,
FetchResponseData.PartitionData> responseDataTmp =
+ new LinkedHashMap<>();
data.responses().forEach(topicResponse -> {
String name;
if (version < 13) {
@@ -110,9 +113,10 @@ public class FetchResponse extends AbstractResponse {
}
if (name != null) {
topicResponse.partitions().forEach(partition ->
- responseData.put(new TopicPartition(name,
partition.partitionIndex()), partition));
+ responseDataTmp.put(new TopicPartition(name,
partition.partitionIndex()), partition));
}
});
+ responseData = responseDataTmp;
}
}
}