This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 12d611b5c [#2353] Improvement: Fix the warning: unchecked method 
invocation: method `sendCachedBuffer` in class 
`RMRecordsReader.RecordsCombiner` is applied to given types (#2358)
12d611b5c is described below

commit 12d611b5cb79cea73203c3da8018cf51751d1fc0
Author: Neo Chien <[email protected]>
AuthorDate: Mon Feb 17 10:24:25 2025 +0800

    [#2353] Improvement: Fix the warning: unchecked method invocation: method 
`sendCachedBuffer` in class `RMRecordsReader.RecordsCombiner` is applied to 
given types (#2358)
    
    ### What changes were proposed in this pull request?
    Fix the warning: unchecked method invocation: method `sendCachedBuffer` in 
class `RMRecordsReader.RecordsCombiner` is applied to given types
    
    ### Why are the changes needed?
    Fix: #2353
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    current UT
    
    <img width="1114" alt="image" 
src="https://github.com/user-attachments/assets/1ac86c0d-d902-47e0-8ff9-1de3fbc3299c";
 />
---
 .../uniffle/client/record/reader/RMRecordsReader.java    | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git 
a/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java
 
b/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java
index 11f6fc6a1..43be684cb 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java
@@ -448,7 +448,7 @@ public class RMRecordsReader<K, V, C> {
     RecordsFetcher(int partitionId) {
       this.partitionId = partitionId;
       this.sleepTime = initFetchSleepTime;
-      this.recordBuffer = new RecordBuffer(partitionId);
+      this.recordBuffer = new RecordBuffer<>(partitionId);
       this.nextQueue =
           combiner == null ? mergeBuffers.get(partitionId) : 
combineBuffers.get(partitionId);
       this.serverInfos = shuffleServerInfoMap.get(partitionId);
@@ -512,7 +512,7 @@ public class RMRecordsReader<K, V, C> {
               // split into two different threads, then will be asynchronous 
processes. Although it
               // seems to save time, it actually consumes more memory.
               reader =
-                  new RecordsReader(
+                  new RecordsReader<>(
                       rssConf,
                       SerInputStream.newInputStream(byteBuf),
                       keyClass,
@@ -526,7 +526,7 @@ public class RMRecordsReader<K, V, C> {
                 }
                 if (recordBuffer.size() >= maxRecordsNumPerBuffer) {
                   nextQueue.put(recordBuffer);
-                  recordBuffer = new RecordBuffer(partitionId);
+                  recordBuffer = new RecordBuffer<>(partitionId);
                 }
                 recordBuffer.addRecord(reader.getCurrentKey(), 
reader.getCurrentValue());
               }
@@ -561,12 +561,12 @@ public class RMRecordsReader<K, V, C> {
     // The RecordBuffer has a capacity limit, records for the same key may be
     // distributed in different RecordBuffers. So we need a cachedBuffer used
     // to record the buffer of the last combine.
-    private RecordBuffer cached;
+    private RecordBuffer<K, C> cached;
     private Queue<RecordBuffer> nextQueue;
 
     RecordsCombiner(int partitionId) {
       this.partitionId = partitionId;
-      this.cached = new RecordBuffer(partitionId);
+      this.cached = new RecordBuffer<>(partitionId);
       this.nextQueue = mergeBuffers.get(partitionId);
       setName("RecordsCombiner-" + partitionId);
     }
@@ -589,13 +589,13 @@ public class RMRecordsReader<K, V, C> {
             //   we can send the cached to downstream directly.
             if (cached.size() > 0 && !isSameKey(cached.getLastKey(), 
current.getFirstKey())) {
               sendCachedBuffer(cached);
-              cached = new RecordBuffer(partitionId);
+              cached = new RecordBuffer<>(partitionId);
             }
 
             // 3 combine the current, then cache it. By this way, we can 
handle the specical case
             // that next record
             // buffer has same key in current.
-            RecordBlob recordBlob = new RecordBlob(partitionId);
+            RecordBlob recordBlob = new RecordBlob<>(partitionId);
             recordBlob.addRecords(current);
             recordBlob.combine(combiner, isMapCombine);
             for (Object record : recordBlob.getResult()) {
@@ -616,7 +616,7 @@ public class RMRecordsReader<K, V, C> {
     private void sendCachedBuffer(RecordBuffer<K, C> cachedBuffer) throws 
InterruptedException {
       // Multiple records with the same key may span different recordbuffers. 
we were only combined
       // within the same recordbuffer. So before send to downstream, we should 
combine the cached.
-      RecordBlob recordBlob = new RecordBlob(partitionId);
+      RecordBlob recordBlob = new RecordBlob<K, C, Object>(partitionId);
       recordBlob.addRecords(cachedBuffer);
       recordBlob.combine(combiner, true);
       RecordBuffer recordBuffer = new RecordBuffer<>(partitionId);

Reply via email to