[
https://issues.apache.org/jira/browse/KAFKA-18852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Raju Gupta updated KAFKA-18852:
-------------------------------
Description:
### Analysis of Changes in `ApiVersions` Class
The `ApiVersions` class has been updated to enhance **thread safety** and
**performance** using concurrent data structures.
#### Key Improvements
1. **`ConcurrentHashMap` for `nodeApiVersions`**
- **Before**: Used `HashMap` with synchronized access.
- **After**: Replaced with `ConcurrentHashMap`, eliminating explicit
synchronization.
- **Benefits**: Improved performance (concurrent reads/writes), reduced
complexity.
2. **`AtomicLong` for `maxFinalizedFeaturesEpoch`**
- **Before**: `long` with synchronized updates.
- **After**: `AtomicLong` ensures atomic updates without locks.
- **Benefits**: Faster updates using CPU-level atomic operations.
3. **Removed `synchronized` Blocks**
- **Impact**: Reduces lock contention, improving scalability.
- **Consideration**: Future modifications must ensure correct usage of
concurrent structures.
4. **Handling `finalizedFeatures`**
- **Issue**: Still uses `Map<String, Short>`, leading to potential race
conditions.
- **Fix**: Replaced with `AtomicReference<Map<String, Short>>` for atomic
updates.
#### Updated Code
```java
private final Map<String, NodeApiVersions> nodeApiVersions = new
ConcurrentHashMap<>();
private final AtomicLong maxFinalizedFeaturesEpoch = new AtomicLong(-1);
private final AtomicReference<Map<String, Short>> finalizedFeatures = new
AtomicReference<>(new ConcurrentHashMap<>());
public void update(String nodeId, NodeApiVersions nodeApiVersions) {
this.nodeApiVersions.put(nodeId, nodeApiVersions);
maxFinalizedFeaturesEpoch.updateAndGet(prev ->
Math.max(prev, nodeApiVersions.finalizedFeaturesEpoch()));
this.finalizedFeatures.set(new
ConcurrentHashMap<>(nodeApiVersions.finalizedFeatures()));
}
public NodeApiVersions get(String nodeId) {
return this.nodeApiVersions.get();
}
public long getMaxFinalizedFeaturesEpoch() {
return maxFinalizedFeaturesEpoch.get();
}
public FinalizedFeaturesInfo getFinalizedFeaturesInfo() {
return new FinalizedFeaturesInfo(maxFinalizedFeaturesEpoch.get(),
finalizedFeatures.get());
}
> ApiVersions should use Concurrent Collections instead of sychronised
> --------------------------------------------------------------------
>
> Key: KAFKA-18852
> URL: https://issues.apache.org/jira/browse/KAFKA-18852
> Project: Kafka
> Issue Type: Improvement
> Components: clients
> Reporter: Raju Gupta
> Priority: Minor
> Attachments: KAFKA-18552.patch
>
>
> ### Analysis of Changes in `ApiVersions` Class
> The `ApiVersions` class has been updated to enhance **thread safety** and
> **performance** using concurrent data structures.
> #### Key Improvements
> 1. **`ConcurrentHashMap` for `nodeApiVersions`**
> - **Before**: Used `HashMap` with synchronized access.
> - **After**: Replaced with `ConcurrentHashMap`, eliminating explicit
> synchronization.
> - **Benefits**: Improved performance (concurrent reads/writes), reduced
> complexity.
> 2. **`AtomicLong` for `maxFinalizedFeaturesEpoch`**
> - **Before**: `long` with synchronized updates.
> - **After**: `AtomicLong` ensures atomic updates without locks.
> - **Benefits**: Faster updates using CPU-level atomic operations.
> 3. **Removed `synchronized` Blocks**
> - **Impact**: Reduces lock contention, improving scalability.
> - **Consideration**: Future modifications must ensure correct usage of
> concurrent structures.
> 4. **Handling `finalizedFeatures`**
> - **Issue**: Still uses `Map<String, Short>`, leading to potential race
> conditions.
> - **Fix**: Replaced with `AtomicReference<Map<String, Short>>` for atomic
> updates.
> #### Updated Code
> ```java
> private final Map<String, NodeApiVersions> nodeApiVersions = new
> ConcurrentHashMap<>();
> private final AtomicLong maxFinalizedFeaturesEpoch = new AtomicLong(-1);
> private final AtomicReference<Map<String, Short>> finalizedFeatures = new
> AtomicReference<>(new ConcurrentHashMap<>());
> public void update(String nodeId, NodeApiVersions nodeApiVersions) {
> this.nodeApiVersions.put(nodeId, nodeApiVersions);
> maxFinalizedFeaturesEpoch.updateAndGet(prev ->
> Math.max(prev, nodeApiVersions.finalizedFeaturesEpoch()));
> this.finalizedFeatures.set(new
> ConcurrentHashMap<>(nodeApiVersions.finalizedFeatures()));
> }
> public NodeApiVersions get(String nodeId) {
> return this.nodeApiVersions.get();
> }
> public long getMaxFinalizedFeaturesEpoch() {
> return maxFinalizedFeaturesEpoch.get();
> }
> public FinalizedFeaturesInfo getFinalizedFeaturesInfo() {
> return new FinalizedFeaturesInfo(maxFinalizedFeaturesEpoch.get(),
> finalizedFeatures.get());
> }
--
This message was sent by Atlassian Jira
(v8.20.10#820010)