This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fd9cad0 [MINOR] test: address unchecked conversions (#624)
4fd9cad0 is described below

commit 4fd9cad07e2943d7f3fa8b2010a43e90520325ff
Author: Kaijie Chen <[email protected]>
AuthorDate: Tue Feb 28 10:24:42 2023 +0800

    [MINOR] test: address unchecked conversions (#624)
    
    ### What changes were proposed in this pull request?
    
    Address unchecked conversions in tests.
    
    ### Why are the changes needed?
    
    Reduce warnings in build log.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI.
---
 .../hadoop/mapred/SortWriteBufferManagerTest.java  | 11 ++++---
 .../spark/shuffle/reader/RssShuffleReaderTest.java |  6 ++--
 .../spark/shuffle/writer/RssShuffleWriterTest.java | 32 +++++++++----------
 .../java/org/apache/spark/shuffle/TestUtils.java   |  3 +-
 .../spark/shuffle/reader/RssShuffleReaderTest.java | 10 +++---
 .../spark/shuffle/writer/RssShuffleWriterTest.java | 36 +++++++++++-----------
 .../segment/LocalOrderSegmentSplitterTest.java     |  1 +
 .../org/apache/uniffle/test/CombineByKeyTest.java  |  2 +-
 .../java/org/apache/uniffle/test/TestUtils.java    |  2 +-
 .../uniffle/test/WriteAndReadMetricsTest.java      |  6 ++--
 .../org/apache/uniffle/test/GetReaderTest.java     | 28 +++++++++--------
 .../test/GetShuffleReportForMultiPartTest.java     |  4 +--
 12 files changed, 75 insertions(+), 66 deletions(-)

diff --git 
a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
 
b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
index 1e07335b..d8c9131f 100644
--- 
a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
+++ 
b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
@@ -59,7 +59,8 @@ public class SortWriteBufferManagerTest {
     Set<Long> failedBlocks = Sets.newConcurrentHashSet();
     Counters.Counter mapOutputByteCounter = new Counters.Counter();
     Counters.Counter mapOutputRecordCounter = new Counters.Counter();
-    SortWriteBufferManager<BytesWritable, BytesWritable> manager = new 
SortWriteBufferManager(
+    SortWriteBufferManager<BytesWritable, BytesWritable> manager;
+    manager = new SortWriteBufferManager<BytesWritable, BytesWritable>(
         10240,
         1L,
         10,
@@ -109,7 +110,7 @@ public class SortWriteBufferManagerTest {
     }
     assertFalse(failedBlocks.isEmpty());
     isException = false;
-    manager = new SortWriteBufferManager(
+    manager = new SortWriteBufferManager<BytesWritable, BytesWritable>(
         100,
         1L,
         10,
@@ -158,7 +159,8 @@ public class SortWriteBufferManagerTest {
     Set<Long> failedBlocks = Sets.newConcurrentHashSet();
     Counters.Counter mapOutputByteCounter = new Counters.Counter();
     Counters.Counter mapOutputRecordCounter = new Counters.Counter();
-    SortWriteBufferManager<BytesWritable, BytesWritable> manager = new 
SortWriteBufferManager(
+    SortWriteBufferManager<BytesWritable, BytesWritable> manager;
+    manager = new SortWriteBufferManager<BytesWritable, BytesWritable>(
         10240,
         1L,
         10,
@@ -206,7 +208,8 @@ public class SortWriteBufferManagerTest {
     Set<Long> failedBlocks = Sets.newConcurrentHashSet();
     Counters.Counter mapOutputByteCounter = new Counters.Counter();
     Counters.Counter mapOutputRecordCounter = new Counters.Counter();
-    SortWriteBufferManager<BytesWritable, BytesWritable> manager = new 
SortWriteBufferManager(
+    SortWriteBufferManager<BytesWritable, BytesWritable> manager;
+    manager = new SortWriteBufferManager<BytesWritable, BytesWritable>(
         10240,
         1L,
         10,
diff --git 
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
 
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
index fa94faf8..11b2c0c7 100644
--- 
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
+++ 
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
@@ -61,8 +61,8 @@ public class RssShuffleReaderTest extends 
AbstractRssReaderTest {
     writeTestData(writeHandler, 2, 5, expectedData,
         blockIdBitmap, "key", KRYO_SERIALIZER, 0);
 
-    RssShuffleHandle handleMock = mock(RssShuffleHandle.class);
-    ShuffleDependency dependencyMock = mock(ShuffleDependency.class);
+    RssShuffleHandle<String, String, String> handleMock = 
mock(RssShuffleHandle.class);
+    ShuffleDependency<String, String, String> dependencyMock = 
mock(ShuffleDependency.class);
     when(handleMock.getAppId()).thenReturn("appId");
     when(handleMock.getShuffleId()).thenReturn(1);
     when(handleMock.getDependency()).thenReturn(dependencyMock);
@@ -80,7 +80,7 @@ public class RssShuffleReaderTest extends 
AbstractRssReaderTest {
     when(dependencyMock.aggregator()).thenReturn(Option.empty());
     when(dependencyMock.keyOrdering()).thenReturn(Option.empty());
 
-    RssShuffleReader rssShuffleReaderSpy = spy(new RssShuffleReader<String, 
String>(0, 1, contextMock,
+    RssShuffleReader<String, String> rssShuffleReaderSpy = spy(new 
RssShuffleReader<>(0, 1, contextMock,
         handleMock, basePath, 1000, conf, StorageType.HDFS.name(),
         1000, 2, 10, blockIdBitmap, taskIdBitmap, new RssConf()));
 
diff --git 
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
 
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index 25de2e35..81d1bd23 100644
--- 
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++ 
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -84,8 +84,8 @@ public class RssShuffleWriterTest {
     Serializer kryoSerializer = new KryoSerializer(conf);
     ShuffleWriteClient mockShuffleWriteClient = mock(ShuffleWriteClient.class);
     Partitioner mockPartitioner = mock(Partitioner.class);
-    ShuffleDependency mockDependency = mock(ShuffleDependency.class);
-    RssShuffleHandle mockHandle = mock(RssShuffleHandle.class);
+    ShuffleDependency<String, String, String> mockDependency = 
mock(ShuffleDependency.class);
+    RssShuffleHandle<String, String, String> mockHandle = 
mock(RssShuffleHandle.class);
     when(mockHandle.getDependency()).thenReturn(mockDependency);
     when(mockDependency.partitioner()).thenReturn(mockPartitioner);
     when(mockPartitioner.numPartitions()).thenReturn(2);
@@ -99,7 +99,7 @@ public class RssShuffleWriterTest {
     WriteBufferManager bufferManagerSpy = spy(bufferManager);
     doReturn(1000000L).when(bufferManagerSpy).acquireMemory(anyLong());
 
-    RssShuffleWriter rssShuffleWriter = new RssShuffleWriter("appId", 0, 
taskId, 1L,
+    RssShuffleWriter<?, ?, ?> rssShuffleWriter = new 
RssShuffleWriter<>("appId", 0, taskId, 1L,
         bufferManagerSpy, (new TaskMetrics()).shuffleWriteMetrics(),
         manager, conf, mockShuffleWriteClient, mockHandle);
 
@@ -166,9 +166,9 @@ public class RssShuffleWriterTest {
     manager.getEventLoop().start();
 
     Partitioner mockPartitioner = mock(Partitioner.class);
-    ShuffleDependency mockDependency = mock(ShuffleDependency.class);
+    ShuffleDependency<String, String, String> mockDependency = 
mock(ShuffleDependency.class);
     final ShuffleWriteClient mockShuffleWriteClient = 
mock(ShuffleWriteClient.class);
-    RssShuffleHandle mockHandle = mock(RssShuffleHandle.class);
+    RssShuffleHandle<String, String, String> mockHandle = 
mock(RssShuffleHandle.class);
     when(mockHandle.getDependency()).thenReturn(mockDependency);
     Serializer kryoSerializer = new KryoSerializer(conf);
     when(mockDependency.serializer()).thenReturn(kryoSerializer);
@@ -205,20 +205,20 @@ public class RssShuffleWriterTest {
     WriteBufferManager bufferManagerSpy = spy(bufferManager);
     doReturn(1000000L).when(bufferManagerSpy).acquireMemory(anyLong());
 
-    RssShuffleWriter rssShuffleWriter = new RssShuffleWriter("appId", 0, 
"taskId", 1L,
+    RssShuffleWriter<String, String, String> rssShuffleWriter = new 
RssShuffleWriter<>("appId", 0, "taskId", 1L,
         bufferManagerSpy, shuffleWriteMetrics, manager, conf, 
mockShuffleWriteClient, mockHandle);
 
     RssShuffleWriter<String, String, String> rssShuffleWriterSpy = 
spy(rssShuffleWriter);
     doNothing().when(rssShuffleWriterSpy).sendCommit();
 
     // case 1
-    MutableList<Product2<String, String>> data = new MutableList();
-    data.appendElem(new Tuple2("testKey1", "testValue1"));
-    data.appendElem(new Tuple2("testKey2", "testValue2"));
-    data.appendElem(new Tuple2("testKey3", "testValue3"));
-    data.appendElem(new Tuple2("testKey4", "testValue4"));
-    data.appendElem(new Tuple2("testKey5", "testValue5"));
-    data.appendElem(new Tuple2("testKey6", "testValue6"));
+    MutableList<Product2<String, String>> data = new MutableList<>();
+    data.appendElem(new Tuple2<>("testKey1", "testValue1"));
+    data.appendElem(new Tuple2<>("testKey2", "testValue2"));
+    data.appendElem(new Tuple2<>("testKey3", "testValue3"));
+    data.appendElem(new Tuple2<>("testKey4", "testValue4"));
+    data.appendElem(new Tuple2<>("testKey5", "testValue5"));
+    data.appendElem(new Tuple2<>("testKey6", "testValue6"));
     rssShuffleWriterSpy.write(data.iterator());
 
     assertTrue(rssShuffleWriterSpy.getShuffleWriteMetrics().shuffleWriteTime() 
> 0);
@@ -255,7 +255,7 @@ public class RssShuffleWriterTest {
   public void postBlockEventTest() throws Exception {
     final WriteBufferManager mockBufferManager = 
mock(WriteBufferManager.class);
     final ShuffleWriteMetrics mockMetrics = mock(ShuffleWriteMetrics.class);
-    ShuffleDependency mockDependency = mock(ShuffleDependency.class);
+    ShuffleDependency<String, String, String> mockDependency = 
mock(ShuffleDependency.class);
     Partitioner mockPartitioner = mock(Partitioner.class);
     final RssShuffleManager mockShuffleManager = mock(RssShuffleManager.class);
     when(mockDependency.partitioner()).thenReturn(mockPartitioner);
@@ -275,14 +275,14 @@ public class RssShuffleWriterTest {
     eventLoop.start();
 
     when(mockShuffleManager.getEventLoop()).thenReturn(eventLoop);
-    RssShuffleHandle mockHandle = mock(RssShuffleHandle.class);
+    RssShuffleHandle<String, String, String> mockHandle = 
mock(RssShuffleHandle.class);
     when(mockHandle.getDependency()).thenReturn(mockDependency);
     ShuffleWriteClient mockWriteClient = mock(ShuffleWriteClient.class);
     SparkConf conf = new SparkConf();
     conf.set(RssSparkConfig.RSS_CLIENT_SEND_SIZE_LIMIT.key(), "64")
         .set(RssSparkConfig.RSS_STORAGE_TYPE.key(), 
StorageType.LOCALFILE.name());
 
-    RssShuffleWriter writer = new RssShuffleWriter("appId", 0, "taskId", 1L,
+    RssShuffleWriter<String, String, String> writer = new 
RssShuffleWriter<>("appId", 0, "taskId", 1L,
         mockBufferManager, mockMetrics, mockShuffleManager, conf, 
mockWriteClient, mockHandle);
     List<ShuffleBlockInfo> shuffleBlockInfoList = createShuffleBlockList(1, 
31);
     writer.postBlockEvent(shuffleBlockInfoList);
diff --git 
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/TestUtils.java 
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/TestUtils.java
index 99619f16..033966a7 100644
--- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/TestUtils.java
+++ b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/TestUtils.java
@@ -22,6 +22,7 @@ import java.util.Set;
 
 import org.apache.commons.lang3.SystemUtils;
 import org.apache.spark.SparkConf;
+import org.apache.spark.shuffle.writer.AddBlockEvent;
 import org.apache.spark.util.EventLoop;
 
 public class TestUtils {
@@ -32,7 +33,7 @@ public class TestUtils {
   public static RssShuffleManager createShuffleManager(
       SparkConf conf,
       Boolean isDriver,
-      EventLoop loop,
+      EventLoop<AddBlockEvent> loop,
       Map<String, Set<Long>> successBlockIds,
       Map<String, Set<Long>> failBlockIds) {
     return new RssShuffleManager(conf, isDriver, loop, successBlockIds, 
failBlockIds);
diff --git 
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
 
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
index c0988e2e..a77ce697 100644
--- 
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
+++ 
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
@@ -66,8 +66,8 @@ public class RssShuffleReaderTest extends 
AbstractRssReaderTest {
     writeTestData(writeHandler, 2, 5, expectedData,
         blockIdBitmap, "key", KRYO_SERIALIZER, 0);
 
-    RssShuffleHandle handleMock = mock(RssShuffleHandle.class);
-    ShuffleDependency dependencyMock = mock(ShuffleDependency.class);
+    RssShuffleHandle<String, String, String> handleMock = 
mock(RssShuffleHandle.class);
+    ShuffleDependency<String, String, String> dependencyMock = 
mock(ShuffleDependency.class);
     when(handleMock.getAppId()).thenReturn("appId");
     when(handleMock.getDependency()).thenReturn(dependencyMock);
     when(handleMock.getShuffleId()).thenReturn(1);
@@ -87,7 +87,7 @@ public class RssShuffleReaderTest extends 
AbstractRssReaderTest {
 
     Map<Integer, Roaring64NavigableMap> partitionToExpectBlocks = 
Maps.newHashMap();
     partitionToExpectBlocks.put(0, blockIdBitmap);
-    RssShuffleReader rssShuffleReaderSpy = spy(new RssShuffleReader<String, 
String>(
+    RssShuffleReader<String, String> rssShuffleReaderSpy = spy(new 
RssShuffleReader<>(
         0,
         1,
         0,
@@ -111,7 +111,7 @@ public class RssShuffleReaderTest extends 
AbstractRssReaderTest {
     writeTestData(writeHandler1, 2, 4, expectedData,
         blockIdBitmap1, "another_key", KRYO_SERIALIZER, 1);
     partitionToExpectBlocks.put(1, blockIdBitmap1);
-    RssShuffleReader rssShuffleReaderSpy1 = spy(new RssShuffleReader<String, 
String>(
+    RssShuffleReader<String, String> rssShuffleReaderSpy1 = spy(new 
RssShuffleReader<>(
         0,
         2,
         0,
@@ -132,7 +132,7 @@ public class RssShuffleReaderTest extends 
AbstractRssReaderTest {
     ));
     validateResult(rssShuffleReaderSpy1.read(), expectedData, 18);
 
-    RssShuffleReader rssShuffleReaderSpy2 = spy(new RssShuffleReader<String, 
String>(
+    RssShuffleReader<String, String> rssShuffleReaderSpy2 = spy(new 
RssShuffleReader<>(
         0,
         2,
         0,
diff --git 
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
 
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index bb92f079..6fe8a45f 100644
--- 
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++ 
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -90,8 +90,8 @@ public class RssShuffleWriterTest {
 
     ShuffleWriteClient mockShuffleWriteClient = mock(ShuffleWriteClient.class);
     Partitioner mockPartitioner = mock(Partitioner.class);
-    RssShuffleHandle mockHandle = mock(RssShuffleHandle.class);
-    ShuffleDependency mockDependency = mock(ShuffleDependency.class);
+    RssShuffleHandle<String, String, String> mockHandle = 
mock(RssShuffleHandle.class);
+    ShuffleDependency<String, String, String> mockDependency = 
mock(ShuffleDependency.class);
     when(mockHandle.getDependency()).thenReturn(mockDependency);
     when(mockPartitioner.numPartitions()).thenReturn(2);
     TaskMemoryManager mockTaskMemoryManager = mock(TaskMemoryManager.class);
@@ -104,7 +104,7 @@ public class RssShuffleWriterTest {
         Maps.newHashMap(), mockTaskMemoryManager, new ShuffleWriteMetrics(), 
new RssConf());
     WriteBufferManager bufferManagerSpy = spy(bufferManager);
 
-    RssShuffleWriter rssShuffleWriter = new RssShuffleWriter("appId", 0, 
"taskId", 1L,
+    RssShuffleWriter<String, String, String> rssShuffleWriter = new 
RssShuffleWriter<>("appId", 0, "taskId", 1L,
         bufferManagerSpy, (new TaskMetrics()).shuffleWriteMetrics(),
         manager, conf, mockShuffleWriteClient, mockHandle);
     doReturn(1000000L).when(bufferManagerSpy).acquireMemory(anyLong());
@@ -176,8 +176,8 @@ public class RssShuffleWriterTest {
     Serializer kryoSerializer = new KryoSerializer(conf);
     Partitioner mockPartitioner = mock(Partitioner.class);
     final ShuffleWriteClient mockShuffleWriteClient = 
mock(ShuffleWriteClient.class);
-    ShuffleDependency mockDependency = mock(ShuffleDependency.class);
-    RssShuffleHandle mockHandle = mock(RssShuffleHandle.class);
+    ShuffleDependency<String, String, String> mockDependency = 
mock(ShuffleDependency.class);
+    RssShuffleHandle<String, String, String> mockHandle = 
mock(RssShuffleHandle.class);
     when(mockHandle.getDependency()).thenReturn(mockDependency);
     when(mockDependency.serializer()).thenReturn(kryoSerializer);
     when(mockDependency.partitioner()).thenReturn(mockPartitioner);
@@ -211,7 +211,7 @@ public class RssShuffleWriterTest {
         0, 0, bufferOptions, kryoSerializer,
         partitionToServers, mockTaskMemoryManager, shuffleWriteMetrics, new 
RssConf());
     WriteBufferManager bufferManagerSpy = spy(bufferManager);
-    RssShuffleWriter rssShuffleWriter = new RssShuffleWriter("appId", 0, 
"taskId", 1L,
+    RssShuffleWriter<String, String, String> rssShuffleWriter = new 
RssShuffleWriter<>("appId", 0, "taskId", 1L,
         bufferManagerSpy, shuffleWriteMetrics, manager, conf, 
mockShuffleWriteClient, mockHandle);
     doReturn(1000000L).when(bufferManagerSpy).acquireMemory(anyLong());
 
@@ -220,13 +220,13 @@ public class RssShuffleWriterTest {
     doNothing().when(rssShuffleWriterSpy).sendCommit();
 
     // case 1
-    MutableList<Product2<String, String>> data = new MutableList();
-    data.appendElem(new Tuple2("testKey2", "testValue2"));
-    data.appendElem(new Tuple2("testKey3", "testValue3"));
-    data.appendElem(new Tuple2("testKey4", "testValue4"));
-    data.appendElem(new Tuple2("testKey6", "testValue6"));
-    data.appendElem(new Tuple2("testKey1", "testValue1"));
-    data.appendElem(new Tuple2("testKey5", "testValue5"));
+    MutableList<Product2<String, String>> data = new MutableList<>();
+    data.appendElem(new Tuple2<>("testKey2", "testValue2"));
+    data.appendElem(new Tuple2<>("testKey3", "testValue3"));
+    data.appendElem(new Tuple2<>("testKey4", "testValue4"));
+    data.appendElem(new Tuple2<>("testKey6", "testValue6"));
+    data.appendElem(new Tuple2<>("testKey1", "testValue1"));
+    data.appendElem(new Tuple2<>("testKey5", "testValue5"));
     rssShuffleWriterSpy.write(data.iterator());
 
     assertTrue(shuffleWriteMetrics.writeTime() > 0);
@@ -265,7 +265,7 @@ public class RssShuffleWriterTest {
   @Test
   public void postBlockEventTest() throws Exception {
     WriteBufferManager mockBufferManager = mock(WriteBufferManager.class);
-    ShuffleDependency mockDependency = mock(ShuffleDependency.class);
+    ShuffleDependency<String, String, String> mockDependency = 
mock(ShuffleDependency.class);
     ShuffleWriteMetrics mockMetrics = mock(ShuffleWriteMetrics.class);
     Partitioner mockPartitioner = mock(Partitioner.class);
     when(mockDependency.partitioner()).thenReturn(mockPartitioner);
@@ -290,14 +290,14 @@ public class RssShuffleWriterTest {
         Maps.newConcurrentMap(),
         Maps.newConcurrentMap()));
 
-    RssShuffleHandle mockHandle = mock(RssShuffleHandle.class);
+    RssShuffleHandle<String, String, String> mockHandle = 
mock(RssShuffleHandle.class);
     when(mockHandle.getDependency()).thenReturn(mockDependency);
     ShuffleWriteClient mockWriteClient = mock(ShuffleWriteClient.class);
     SparkConf conf = new SparkConf();
     conf.set(RssSparkConfig.RSS_CLIENT_SEND_SIZE_LIMIT.key(), "64")
         .set(RssSparkConfig.RSS_STORAGE_TYPE.key(), 
StorageType.MEMORY_LOCALFILE.name());
     List<ShuffleBlockInfo> shuffleBlockInfoList = createShuffleBlockList(1, 
31);
-    RssShuffleWriter writer = new RssShuffleWriter("appId", 0, "taskId", 1L,
+    RssShuffleWriter<String, String, String> writer = new 
RssShuffleWriter<>("appId", 0, "taskId", 1L,
         mockBufferManager, mockMetrics, mockShuffleManager, conf, 
mockWriteClient, mockHandle);
     writer.postBlockEvent(shuffleBlockInfoList);
     Thread.sleep(500);
@@ -324,7 +324,7 @@ public class RssShuffleWriterTest {
 
   private void testTwoEvents(
       List<AddBlockEvent> events,
-      RssShuffleWriter writer,
+      RssShuffleWriter<String, String, String> writer,
       int blockNum,
       int blockLength,
       int firstEventSize,
@@ -341,7 +341,7 @@ public class RssShuffleWriterTest {
 
   private void testSingleEvent(
       List<AddBlockEvent> events,
-      RssShuffleWriter writer,
+      RssShuffleWriter<String, String, String> writer,
       int blockNum,
       int blockLength) throws InterruptedException {
     List<ShuffleBlockInfo> shuffleBlockInfoList;
diff --git 
a/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
 
b/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
index b93ab23b..418cb7f5 100644
--- 
a/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
+++ 
b/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
@@ -382,6 +382,7 @@ public class LocalOrderSegmentSplitterTest {
     assertEquals(1, dataSegments.get(1).getLength());
   }
 
+  @SafeVarargs
   public static byte[] generateData(Pair<Integer, Integer>... configEntries) {
     ByteBuffer byteBuffer = ByteBuffer.allocate(configEntries.length * 40);
     int total = 0;
diff --git 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/CombineByKeyTest.java
 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/CombineByKeyTest.java
index dfacfa9c..64e4ec6a 100644
--- 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/CombineByKeyTest.java
+++ 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/CombineByKeyTest.java
@@ -33,7 +33,7 @@ public class CombineByKeyTest extends SimpleTestBase {
   }
 
   @Override
-  public Map runTest(SparkSession spark, String fileName) throws Exception {
+  public Map<String, Tuple2<Integer, Integer>> runTest(SparkSession spark, 
String fileName) throws Exception {
     // take a rest to make sure shuffle server is registered
     Thread.sleep(4000);
     JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
diff --git 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/TestUtils.java
 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/TestUtils.java
index 3d4a5056..929c2337 100644
--- 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/TestUtils.java
+++ 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/TestUtils.java
@@ -53,7 +53,7 @@ public class TestUtils {
     return javaPairRDD1;
   }
 
-  static JavaPairRDD<String, Tuple2<Integer, Integer>> 
combineByKeyRDD(JavaPairRDD javaPairRDD1) {
+  static JavaPairRDD<String, Tuple2<Integer, Integer>> 
combineByKeyRDD(JavaPairRDD<String, Integer> javaPairRDD1) {
     JavaPairRDD<String, Tuple2<Integer, Integer>> javaPairRDD = javaPairRDD1
         .combineByKey((Function<Integer, Tuple2<Integer, Integer>>) i -> new 
Tuple2<>(1, i),
             (Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, 
Integer>>)
diff --git 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/WriteAndReadMetricsTest.java
 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/WriteAndReadMetricsTest.java
index b38b9fb3..a24204d5 100644
--- 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/WriteAndReadMetricsTest.java
+++ 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/WriteAndReadMetricsTest.java
@@ -40,7 +40,7 @@ public class WriteAndReadMetricsTest extends SimpleTestBase {
   }
 
   @Override
-  public Map runTest(SparkSession spark, String fileName) throws Exception {
+  public Map<String, Long> runTest(SparkSession spark, String fileName) throws 
Exception {
     // take a rest to make sure shuffle server is registered
     Thread.sleep(3000);
 
@@ -49,9 +49,9 @@ public class WriteAndReadMetricsTest extends SimpleTestBase {
             .otherwise(functions.col("id")).as("key1"), 
functions.col("id").as("value1"));
     df1.createOrReplaceTempView("table1");
 
-    List list = spark.sql("select count(value1) from table1 group by 
key1").collectAsList();
+    List<?> list = spark.sql("select count(value1) from table1 group by 
key1").collectAsList();
     Map<String, Long> result = new HashMap<>();
-    result.put("size", Long.valueOf(list.size()));
+    result.put("size", (long) list.size());
 
     for (int stageId : 
spark.sparkContext().statusTracker().getJobInfo(0).get().stageIds()) {
       long writeRecords = getFirstStageData(spark, 
stageId).shuffleWriteRecords();
diff --git 
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
index 7b9590e5..a482e0df 100644
--- 
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
+++ 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
@@ -103,8 +103,9 @@ public class GetReaderTest extends IntegrationTestBase {
     SparkSession sparkSession = 
SparkSession.builder().config(sparkConf).getOrCreate();
     JavaSparkContext jsc1 = new JavaSparkContext(sparkSession.sparkContext());
     JavaPairRDD<String, Tuple2<Integer, Integer>> javaPairRDD1 = 
TestUtils.combineByKeyRDD(TestUtils.getRDD(jsc1));
-    ShuffleDependency shuffleDependency1 = (ShuffleDependency) 
javaPairRDD1.rdd().dependencies().head();
-    RssShuffleHandle rssShuffleHandle1 = (RssShuffleHandle) 
shuffleDependency1.shuffleHandle();
+    ShuffleDependency<?, ?, ?> shuffleDependency1 =
+        (ShuffleDependency<?, ?, ?>) javaPairRDD1.rdd().dependencies().head();
+    RssShuffleHandle<?, ?, ?> rssShuffleHandle1 = (RssShuffleHandle<?, ?, ?>) 
shuffleDependency1.shuffleHandle();
     RemoteStorageInfo remoteStorageInfo1 = 
rssShuffleHandle1.getRemoteStorage();
     assertEquals(remoteStorage1, remoteStorageInfo1.getPath());
     assertTrue(remoteStorageInfo1.getConfItems().isEmpty());
@@ -112,23 +113,26 @@ public class GetReaderTest extends IntegrationTestBase {
     // emptyRDD case
     JavaPairRDD<String, Tuple2<Integer, Integer>> javaEmptyPairRDD1 = 
TestUtils.combineByKeyRDD(
         TestUtils.getEmptyRDD(jsc1));
-    ShuffleDependency emptyShuffleDependency1 = (ShuffleDependency) 
javaEmptyPairRDD1.rdd().dependencies().head();
-    RssShuffleHandle emptyRssShuffleHandle1 = (RssShuffleHandle) 
emptyShuffleDependency1.shuffleHandle();
+    ShuffleDependency<?, ?, ?> emptyShuffleDependency1 =
+        (ShuffleDependency<?, ?, ?>) 
javaEmptyPairRDD1.rdd().dependencies().head();
+    RssShuffleHandle<?, ?, ?> emptyRssShuffleHandle1 =
+        (RssShuffleHandle<?, ?, ?>) emptyShuffleDependency1.shuffleHandle();
     
assertEquals(javaEmptyPairRDD1.rdd().dependencies().head().rdd().getNumPartitions(),
 0);
     assertEquals(emptyRssShuffleHandle1.getPartitionToServers(), 
Collections.emptyMap());
     
assertEquals(emptyRssShuffleHandle1.getRemoteStorage(),RemoteStorageInfo.EMPTY_REMOTE_STORAGE);
 
     // the same app would get the same storage info
     JavaPairRDD<String, Tuple2<Integer, Integer>> javaPairRDD2 = 
TestUtils.combineByKeyRDD(TestUtils.getRDD(jsc1));
-    ShuffleDependency shuffleDependency2 = (ShuffleDependency) 
javaPairRDD2.rdd().dependencies().head();
-    RssShuffleHandle rssShuffleHandle2 = (RssShuffleHandle) 
shuffleDependency2.shuffleHandle();
+    ShuffleDependency<?, ?, ?> shuffleDependency2 =
+        (ShuffleDependency<?, ?, ?>) javaPairRDD2.rdd().dependencies().head();
+    RssShuffleHandle<?, ?, ?> rssShuffleHandle2 = (RssShuffleHandle<?, ?, ?>) 
shuffleDependency2.shuffleHandle();
     RemoteStorageInfo remoteStorageInfo2 = 
rssShuffleHandle2.getRemoteStorage();
     assertEquals(remoteStorage1, remoteStorageInfo1.getPath());
     assertTrue(remoteStorageInfo2.getConfItems().isEmpty());
 
     RssShuffleManager rssShuffleManager = (RssShuffleManager) 
sparkSession.sparkContext().env().shuffleManager();
-    RssShuffleHandle rssShuffleHandle  = (RssShuffleHandle) 
shuffleDependency2.shuffleHandle();
-    RssShuffleReader rssShuffleReader = (RssShuffleReader) 
rssShuffleManager.getReader(
+    RssShuffleHandle<?, ?, ?> rssShuffleHandle  = (RssShuffleHandle<?, ?, ?>) 
shuffleDependency2.shuffleHandle();
+    RssShuffleReader<?, ?> rssShuffleReader = (RssShuffleReader<?, ?>) 
rssShuffleManager.getReader(
         rssShuffleHandle, 0, 0, new MockTaskContext(), new 
TempShuffleReadMetrics());
     Configuration hadoopConf =  rssShuffleReader.getHadoopConf();
     assertNull(hadoopConf.get("k1"));
@@ -141,8 +145,8 @@ public class GetReaderTest extends IntegrationTestBase {
     rssShuffleManager.setAppId("test2");
     JavaSparkContext jsc2 = new JavaSparkContext(sparkSession.sparkContext());
     JavaPairRDD<String, Tuple2<Integer, Integer>> javaPairRDD = 
TestUtils.combineByKeyRDD(TestUtils.getRDD(jsc2));
-    ShuffleDependency shuffleDependency = (ShuffleDependency) 
javaPairRDD.rdd().dependencies().head();
-    rssShuffleHandle = (RssShuffleHandle) shuffleDependency.shuffleHandle();
+    ShuffleDependency<?, ?, ?> shuffleDependency = (ShuffleDependency<?, ?, 
?>) javaPairRDD.rdd().dependencies().head();
+    rssShuffleHandle = (RssShuffleHandle<?, ?, ?>) 
shuffleDependency.shuffleHandle();
     // the reason for sleep here is to ensure that threads can be scheduled 
normally
     Thread.sleep(500);
     RemoteStorageInfo remoteStorageInfo3 = rssShuffleHandle.getRemoteStorage();
@@ -151,7 +155,7 @@ public class GetReaderTest extends IntegrationTestBase {
     assertEquals("v1", remoteStorageInfo3.getConfItems().get("k1"));
     assertEquals("v2", remoteStorageInfo3.getConfItems().get("k2"));
 
-    rssShuffleReader = (RssShuffleReader) rssShuffleManager.getReader(
+    rssShuffleReader = (RssShuffleReader<?, ?>) rssShuffleManager.getReader(
         rssShuffleHandle, 0, 0, new MockTaskContext(), new 
TempShuffleReadMetrics());
     hadoopConf =  rssShuffleReader.getHadoopConf();
     assertEquals("v1", hadoopConf.get("k1"));
@@ -161,7 +165,7 @@ public class GetReaderTest extends IntegrationTestBase {
     assertNull(commonHadoopConf.get("k1"));
     assertNull(commonHadoopConf.get("k2"));
 
-    rssShuffleReader = (RssShuffleReader) rssShuffleManager.getReader(
+    rssShuffleReader = (RssShuffleReader<?, ?>) rssShuffleManager.getReader(
         rssShuffleHandle, 0, 0, new MockTaskContext(), new 
TempShuffleReadMetrics());
     hadoopConf =  rssShuffleReader.getHadoopConf();
     assertEquals("v1", hadoopConf.get("k1"));
diff --git 
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
index ec7a7c93..6a7553ae 100644
--- 
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
+++ 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
@@ -142,7 +142,7 @@ public class GetShuffleReportForMultiPartTest extends 
SparkIntegrationTestBase {
   }
 
   @Override
-  Map runTest(SparkSession spark, String fileName) throws Exception {
+  Map<Integer, String> runTest(SparkSession spark, String fileName) throws 
Exception {
     Thread.sleep(4000);
     Map<Integer, String> map = Maps.newHashMap();
     Dataset<Row> df2 = spark.range(0, 1000, 1, 10)
@@ -218,7 +218,7 @@ public class GetShuffleReportForMultiPartTest extends 
SparkIntegrationTestBase {
         ShuffleReadMetricsReporter metrics,
         Roaring64NavigableMap taskIdBitmap) {
       int shuffleId = handle.shuffleId();
-      RssShuffleHandle rssShuffleHandle = (RssShuffleHandle) handle;
+      RssShuffleHandle<?, ?, ?> rssShuffleHandle = (RssShuffleHandle<?, ?, ?>) 
handle;
       Map<Integer, List<ShuffleServerInfo>> allPartitionToServers = 
rssShuffleHandle.getPartitionToServers();
       int partitionNum = (int) allPartitionToServers.entrySet().stream()
                                    .filter(x -> x.getKey() >= startPartition 
&& x.getKey() < endPartition).count();

Reply via email to