[
https://issues.apache.org/jira/browse/KAFKA-18852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Raju Gupta updated KAFKA-18852:
-------------------------------
Description:
h2. Analysis of Changes in ApiVersions Class
The *ApiVersions* class has been updated to enhance *thread safety* and
*performance* using concurrent data structures.
h3. Key Improvements
* *ConcurrentHashMap for nodeApiVersions*
* *Before*: Used `HashMap` with synchronized access.
* *After*: Replaced with `ConcurrentHashMap`, eliminating explicit
synchronization.
* *Benefits*: Improved performance (concurrent reads/writes), reduced
complexity.
* *AtomicLong for maxFinalizedFeaturesEpoch*
* *Before*: `long` with synchronized updates.
* *After*: `AtomicLong` ensures atomic updates without locks.
* *Benefits*: Faster updates using CPU-level atomic operations.
* *Removed synchronized Blocks*
* *Impact*: Reduces lock contention, improving scalability.
* *Consideration*: Future modifications must ensure correct usage of
concurrent structures.
* *Handling finalizedFeatures*
* *Issue*: Still uses `Map<String, Short>`, leading to potential race
conditions.
* *Fix*: Replaced with `AtomicReference<Map<String, Short>>` for atomic
updates.
h3. Updated Code
{code:java}
private final Map<String, NodeApiVersions> nodeApiVersions = new
ConcurrentHashMap<>();
private final AtomicLong maxFinalizedFeaturesEpoch = new AtomicLong(-1);
private final AtomicReference<Map<String, Short>> finalizedFeatures = new
AtomicReference<>(new ConcurrentHashMap<>());
public void update(String nodeId, NodeApiVersions nodeApiVersions) {
this.nodeApiVersions.put(nodeId, nodeApiVersions);
maxFinalizedFeaturesEpoch.updateAndGet(prev ->
Math.max(prev, nodeApiVersions.finalizedFeaturesEpoch()));
this.finalizedFeatures.set(new
ConcurrentHashMap<>(nodeApiVersions.finalizedFeatures()));
}
public NodeApiVersions get(String nodeId) {
return this.nodeApiVersions.get();
}
public long getMaxFinalizedFeaturesEpoch() {
return maxFinalizedFeaturesEpoch.get();
}
public FinalizedFeaturesInfo getFinalizedFeaturesInfo() {
return new FinalizedFeaturesInfo(maxFinalizedFeaturesEpoch.get(),
finalizedFeatures.get());
}
{code}
h3. Summary
* *Thread Safety*: Ensured with `ConcurrentHashMap` and `AtomicLong`.
* *Performance*: Reduced locks improve multi-threaded efficiency.
* *Scalability*: Supports high concurrency better.
* *Next Steps*: Test for edge cases and update documentation.
was:
### Analysis of Changes in `ApiVersions` Class
The `ApiVersions` class has been updated to enhance **thread safety** and
**performance** using concurrent data structures.
#### Key Improvements
1. **`ConcurrentHashMap` for `nodeApiVersions`**
- **Before**: Used `HashMap` with synchronized access.
- **After**: Replaced with `ConcurrentHashMap`, eliminating explicit
synchronization.
- **Benefits**: Improved performance (concurrent reads/writes), reduced
complexity.
2. **`AtomicLong` for `maxFinalizedFeaturesEpoch`**
- **Before**: `long` with synchronized updates.
- **After**: `AtomicLong` ensures atomic updates without locks.
- **Benefits**: Faster updates using CPU-level atomic operations.
3. **Removed `synchronized` Blocks**
- **Impact**: Reduces lock contention, improving scalability.
- **Consideration**: Future modifications must ensure correct usage of
concurrent structures.
4. **Handling `finalizedFeatures`**
- **Issue**: Still uses `Map<String, Short>`, leading to potential race
conditions.
- **Fix**: Replaced with `AtomicReference<Map<String, Short>>` for atomic
updates.
#### Updated Code
```java
private final Map<String, NodeApiVersions> nodeApiVersions = new
ConcurrentHashMap<>();
private final AtomicLong maxFinalizedFeaturesEpoch = new AtomicLong(-1);
private final AtomicReference<Map<String, Short>> finalizedFeatures = new
AtomicReference<>(new ConcurrentHashMap<>());
public void update(String nodeId, NodeApiVersions nodeApiVersions) {
this.nodeApiVersions.put(nodeId, nodeApiVersions);
maxFinalizedFeaturesEpoch.updateAndGet(prev ->
Math.max(prev, nodeApiVersions.finalizedFeaturesEpoch()));
this.finalizedFeatures.set(new
ConcurrentHashMap<>(nodeApiVersions.finalizedFeatures()));
}
public NodeApiVersions get(String nodeId) {
return this.nodeApiVersions.get();
}
public long getMaxFinalizedFeaturesEpoch() {
return maxFinalizedFeaturesEpoch.get();
}
public FinalizedFeaturesInfo getFinalizedFeaturesInfo() {
return new FinalizedFeaturesInfo(maxFinalizedFeaturesEpoch.get(),
finalizedFeatures.get());
}
> ApiVersions should use Concurrent Collections instead of sychronised
> --------------------------------------------------------------------
>
> Key: KAFKA-18852
> URL: https://issues.apache.org/jira/browse/KAFKA-18852
> Project: Kafka
> Issue Type: Improvement
> Components: clients
> Reporter: Raju Gupta
> Priority: Minor
> Attachments: KAFKA-18552.patch
>
>
> h2. Analysis of Changes in ApiVersions Class
> The *ApiVersions* class has been updated to enhance *thread safety* and
> *performance* using concurrent data structures.
> h3. Key Improvements
> * *ConcurrentHashMap for nodeApiVersions*
> * *Before*: Used `HashMap` with synchronized access.
> * *After*: Replaced with `ConcurrentHashMap`, eliminating explicit
> synchronization.
> * *Benefits*: Improved performance (concurrent reads/writes), reduced
> complexity.
> * *AtomicLong for maxFinalizedFeaturesEpoch*
> * *Before*: `long` with synchronized updates.
> * *After*: `AtomicLong` ensures atomic updates without locks.
> * *Benefits*: Faster updates using CPU-level atomic operations.
> * *Removed synchronized Blocks*
> * *Impact*: Reduces lock contention, improving scalability.
> * *Consideration*: Future modifications must ensure correct usage of
> concurrent structures.
> * *Handling finalizedFeatures*
> * *Issue*: Still uses `Map<String, Short>`, leading to potential race
> conditions.
> * *Fix*: Replaced with `AtomicReference<Map<String, Short>>` for atomic
> updates.
> h3. Updated Code
> {code:java}
> private final Map<String, NodeApiVersions> nodeApiVersions = new
> ConcurrentHashMap<>();
> private final AtomicLong maxFinalizedFeaturesEpoch = new AtomicLong(-1);
> private final AtomicReference<Map<String, Short>> finalizedFeatures = new
> AtomicReference<>(new ConcurrentHashMap<>());
> public void update(String nodeId, NodeApiVersions nodeApiVersions) {
> this.nodeApiVersions.put(nodeId, nodeApiVersions);
> maxFinalizedFeaturesEpoch.updateAndGet(prev ->
> Math.max(prev, nodeApiVersions.finalizedFeaturesEpoch()));
> this.finalizedFeatures.set(new
> ConcurrentHashMap<>(nodeApiVersions.finalizedFeatures()));
> }
> public NodeApiVersions get(String nodeId) {
> return this.nodeApiVersions.get();
> }
> public long getMaxFinalizedFeaturesEpoch() {
> return maxFinalizedFeaturesEpoch.get();
> }
> public FinalizedFeaturesInfo getFinalizedFeaturesInfo() {
> return new FinalizedFeaturesInfo(maxFinalizedFeaturesEpoch.get(),
> finalizedFeatures.get());
> }
> {code}
> h3. Summary
> * *Thread Safety*: Ensured with `ConcurrentHashMap` and `AtomicLong`.
> * *Performance*: Reduced locks improve multi-threaded efficiency.
> * *Scalability*: Supports high concurrency better.
> * *Next Steps*: Test for edge cases and update documentation.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)