This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 12d611b5c [#2353] Improvement: Fix the warning: unchecked method
invocation: method `sendCachedBuffer` in class
`RMRecordsReader.RecordsCombiner` is applied to given types (#2358)
12d611b5c is described below
commit 12d611b5cb79cea73203c3da8018cf51751d1fc0
Author: Neo Chien <[email protected]>
AuthorDate: Mon Feb 17 10:24:25 2025 +0800
[#2353] Improvement: Fix the warning: unchecked method invocation: method
`sendCachedBuffer` in class `RMRecordsReader.RecordsCombiner` is applied to
given types (#2358)
### What changes were proposed in this pull request?
Fix the warning: unchecked method invocation: method `sendCachedBuffer` in
class `RMRecordsReader.RecordsCombiner` is applied to given types
### Why are the changes needed?
Fix: #2353
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
current UT
<img width="1114" alt="image"
src="https://github.com/user-attachments/assets/1ac86c0d-d902-47e0-8ff9-1de3fbc3299c"
/>
---
.../uniffle/client/record/reader/RMRecordsReader.java | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
diff --git
a/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java
b/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java
index 11f6fc6a1..43be684cb 100644
---
a/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java
+++
b/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java
@@ -448,7 +448,7 @@ public class RMRecordsReader<K, V, C> {
RecordsFetcher(int partitionId) {
this.partitionId = partitionId;
this.sleepTime = initFetchSleepTime;
- this.recordBuffer = new RecordBuffer(partitionId);
+ this.recordBuffer = new RecordBuffer<>(partitionId);
this.nextQueue =
combiner == null ? mergeBuffers.get(partitionId) :
combineBuffers.get(partitionId);
this.serverInfos = shuffleServerInfoMap.get(partitionId);
@@ -512,7 +512,7 @@ public class RMRecordsReader<K, V, C> {
// split into two different threads, then will be asynchronous
processes. Although it
// seems to save time, it actually consumes more memory.
reader =
- new RecordsReader(
+ new RecordsReader<>(
rssConf,
SerInputStream.newInputStream(byteBuf),
keyClass,
@@ -526,7 +526,7 @@ public class RMRecordsReader<K, V, C> {
}
if (recordBuffer.size() >= maxRecordsNumPerBuffer) {
nextQueue.put(recordBuffer);
- recordBuffer = new RecordBuffer(partitionId);
+ recordBuffer = new RecordBuffer<>(partitionId);
}
recordBuffer.addRecord(reader.getCurrentKey(),
reader.getCurrentValue());
}
@@ -561,12 +561,12 @@ public class RMRecordsReader<K, V, C> {
// The RecordBuffer has a capacity limit, records for the same key may be
// distributed in different RecordBuffers. So we need a cachedBuffer used
// to record the buffer of the last combine.
- private RecordBuffer cached;
+ private RecordBuffer<K, C> cached;
private Queue<RecordBuffer> nextQueue;
RecordsCombiner(int partitionId) {
this.partitionId = partitionId;
- this.cached = new RecordBuffer(partitionId);
+ this.cached = new RecordBuffer<>(partitionId);
this.nextQueue = mergeBuffers.get(partitionId);
setName("RecordsCombiner-" + partitionId);
}
@@ -589,13 +589,13 @@ public class RMRecordsReader<K, V, C> {
// we can send the cached to downstream directly.
if (cached.size() > 0 && !isSameKey(cached.getLastKey(),
current.getFirstKey())) {
sendCachedBuffer(cached);
- cached = new RecordBuffer(partitionId);
+ cached = new RecordBuffer<>(partitionId);
}
// 3 combine the current, then cache it. By this way, we can
handle the specical case
// that next record
// buffer has same key in current.
- RecordBlob recordBlob = new RecordBlob(partitionId);
+ RecordBlob recordBlob = new RecordBlob<>(partitionId);
recordBlob.addRecords(current);
recordBlob.combine(combiner, isMapCombine);
for (Object record : recordBlob.getResult()) {
@@ -616,7 +616,7 @@ public class RMRecordsReader<K, V, C> {
private void sendCachedBuffer(RecordBuffer<K, C> cachedBuffer) throws
InterruptedException {
// Multiple records with the same key may span different recordbuffers.
we were only combined
// within the same recordbuffer. So before send to downstream, we should
combine the cached.
- RecordBlob recordBlob = new RecordBlob(partitionId);
+ RecordBlob recordBlob = new RecordBlob<K, C, Object>(partitionId);
recordBlob.addRecords(cachedBuffer);
recordBlob.combine(combiner, true);
RecordBuffer recordBuffer = new RecordBuffer<>(partitionId);