This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 4fd9cad0 [MINOR] test: address unchecked conversions (#624)
4fd9cad0 is described below
commit 4fd9cad07e2943d7f3fa8b2010a43e90520325ff
Author: Kaijie Chen <[email protected]>
AuthorDate: Tue Feb 28 10:24:42 2023 +0800
[MINOR] test: address unchecked conversions (#624)
### What changes were proposed in this pull request?
Address unchecked conversions in tests.
### Why are the changes needed?
Reduce warnings in build log.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
---
.../hadoop/mapred/SortWriteBufferManagerTest.java | 11 ++++---
.../spark/shuffle/reader/RssShuffleReaderTest.java | 6 ++--
.../spark/shuffle/writer/RssShuffleWriterTest.java | 32 +++++++++----------
.../java/org/apache/spark/shuffle/TestUtils.java | 3 +-
.../spark/shuffle/reader/RssShuffleReaderTest.java | 10 +++---
.../spark/shuffle/writer/RssShuffleWriterTest.java | 36 +++++++++++-----------
.../segment/LocalOrderSegmentSplitterTest.java | 1 +
.../org/apache/uniffle/test/CombineByKeyTest.java | 2 +-
.../java/org/apache/uniffle/test/TestUtils.java | 2 +-
.../uniffle/test/WriteAndReadMetricsTest.java | 6 ++--
.../org/apache/uniffle/test/GetReaderTest.java | 28 +++++++++--------
.../test/GetShuffleReportForMultiPartTest.java | 4 +--
12 files changed, 75 insertions(+), 66 deletions(-)
diff --git
a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
index 1e07335b..d8c9131f 100644
---
a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
+++
b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
@@ -59,7 +59,8 @@ public class SortWriteBufferManagerTest {
Set<Long> failedBlocks = Sets.newConcurrentHashSet();
Counters.Counter mapOutputByteCounter = new Counters.Counter();
Counters.Counter mapOutputRecordCounter = new Counters.Counter();
- SortWriteBufferManager<BytesWritable, BytesWritable> manager = new
SortWriteBufferManager(
+ SortWriteBufferManager<BytesWritable, BytesWritable> manager;
+ manager = new SortWriteBufferManager<BytesWritable, BytesWritable>(
10240,
1L,
10,
@@ -109,7 +110,7 @@ public class SortWriteBufferManagerTest {
}
assertFalse(failedBlocks.isEmpty());
isException = false;
- manager = new SortWriteBufferManager(
+ manager = new SortWriteBufferManager<BytesWritable, BytesWritable>(
100,
1L,
10,
@@ -158,7 +159,8 @@ public class SortWriteBufferManagerTest {
Set<Long> failedBlocks = Sets.newConcurrentHashSet();
Counters.Counter mapOutputByteCounter = new Counters.Counter();
Counters.Counter mapOutputRecordCounter = new Counters.Counter();
- SortWriteBufferManager<BytesWritable, BytesWritable> manager = new
SortWriteBufferManager(
+ SortWriteBufferManager<BytesWritable, BytesWritable> manager;
+ manager = new SortWriteBufferManager<BytesWritable, BytesWritable>(
10240,
1L,
10,
@@ -206,7 +208,8 @@ public class SortWriteBufferManagerTest {
Set<Long> failedBlocks = Sets.newConcurrentHashSet();
Counters.Counter mapOutputByteCounter = new Counters.Counter();
Counters.Counter mapOutputRecordCounter = new Counters.Counter();
- SortWriteBufferManager<BytesWritable, BytesWritable> manager = new
SortWriteBufferManager(
+ SortWriteBufferManager<BytesWritable, BytesWritable> manager;
+ manager = new SortWriteBufferManager<BytesWritable, BytesWritable>(
10240,
1L,
10,
diff --git
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
index fa94faf8..11b2c0c7 100644
---
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
+++
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
@@ -61,8 +61,8 @@ public class RssShuffleReaderTest extends
AbstractRssReaderTest {
writeTestData(writeHandler, 2, 5, expectedData,
blockIdBitmap, "key", KRYO_SERIALIZER, 0);
- RssShuffleHandle handleMock = mock(RssShuffleHandle.class);
- ShuffleDependency dependencyMock = mock(ShuffleDependency.class);
+ RssShuffleHandle<String, String, String> handleMock =
mock(RssShuffleHandle.class);
+ ShuffleDependency<String, String, String> dependencyMock =
mock(ShuffleDependency.class);
when(handleMock.getAppId()).thenReturn("appId");
when(handleMock.getShuffleId()).thenReturn(1);
when(handleMock.getDependency()).thenReturn(dependencyMock);
@@ -80,7 +80,7 @@ public class RssShuffleReaderTest extends
AbstractRssReaderTest {
when(dependencyMock.aggregator()).thenReturn(Option.empty());
when(dependencyMock.keyOrdering()).thenReturn(Option.empty());
- RssShuffleReader rssShuffleReaderSpy = spy(new RssShuffleReader<String,
String>(0, 1, contextMock,
+ RssShuffleReader<String, String> rssShuffleReaderSpy = spy(new
RssShuffleReader<>(0, 1, contextMock,
handleMock, basePath, 1000, conf, StorageType.HDFS.name(),
1000, 2, 10, blockIdBitmap, taskIdBitmap, new RssConf()));
diff --git
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index 25de2e35..81d1bd23 100644
---
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -84,8 +84,8 @@ public class RssShuffleWriterTest {
Serializer kryoSerializer = new KryoSerializer(conf);
ShuffleWriteClient mockShuffleWriteClient = mock(ShuffleWriteClient.class);
Partitioner mockPartitioner = mock(Partitioner.class);
- ShuffleDependency mockDependency = mock(ShuffleDependency.class);
- RssShuffleHandle mockHandle = mock(RssShuffleHandle.class);
+ ShuffleDependency<String, String, String> mockDependency =
mock(ShuffleDependency.class);
+ RssShuffleHandle<String, String, String> mockHandle =
mock(RssShuffleHandle.class);
when(mockHandle.getDependency()).thenReturn(mockDependency);
when(mockDependency.partitioner()).thenReturn(mockPartitioner);
when(mockPartitioner.numPartitions()).thenReturn(2);
@@ -99,7 +99,7 @@ public class RssShuffleWriterTest {
WriteBufferManager bufferManagerSpy = spy(bufferManager);
doReturn(1000000L).when(bufferManagerSpy).acquireMemory(anyLong());
- RssShuffleWriter rssShuffleWriter = new RssShuffleWriter("appId", 0,
taskId, 1L,
+ RssShuffleWriter<?, ?, ?> rssShuffleWriter = new
RssShuffleWriter<>("appId", 0, taskId, 1L,
bufferManagerSpy, (new TaskMetrics()).shuffleWriteMetrics(),
manager, conf, mockShuffleWriteClient, mockHandle);
@@ -166,9 +166,9 @@ public class RssShuffleWriterTest {
manager.getEventLoop().start();
Partitioner mockPartitioner = mock(Partitioner.class);
- ShuffleDependency mockDependency = mock(ShuffleDependency.class);
+ ShuffleDependency<String, String, String> mockDependency =
mock(ShuffleDependency.class);
final ShuffleWriteClient mockShuffleWriteClient =
mock(ShuffleWriteClient.class);
- RssShuffleHandle mockHandle = mock(RssShuffleHandle.class);
+ RssShuffleHandle<String, String, String> mockHandle =
mock(RssShuffleHandle.class);
when(mockHandle.getDependency()).thenReturn(mockDependency);
Serializer kryoSerializer = new KryoSerializer(conf);
when(mockDependency.serializer()).thenReturn(kryoSerializer);
@@ -205,20 +205,20 @@ public class RssShuffleWriterTest {
WriteBufferManager bufferManagerSpy = spy(bufferManager);
doReturn(1000000L).when(bufferManagerSpy).acquireMemory(anyLong());
- RssShuffleWriter rssShuffleWriter = new RssShuffleWriter("appId", 0,
"taskId", 1L,
+ RssShuffleWriter<String, String, String> rssShuffleWriter = new
RssShuffleWriter<>("appId", 0, "taskId", 1L,
bufferManagerSpy, shuffleWriteMetrics, manager, conf,
mockShuffleWriteClient, mockHandle);
RssShuffleWriter<String, String, String> rssShuffleWriterSpy =
spy(rssShuffleWriter);
doNothing().when(rssShuffleWriterSpy).sendCommit();
// case 1
- MutableList<Product2<String, String>> data = new MutableList();
- data.appendElem(new Tuple2("testKey1", "testValue1"));
- data.appendElem(new Tuple2("testKey2", "testValue2"));
- data.appendElem(new Tuple2("testKey3", "testValue3"));
- data.appendElem(new Tuple2("testKey4", "testValue4"));
- data.appendElem(new Tuple2("testKey5", "testValue5"));
- data.appendElem(new Tuple2("testKey6", "testValue6"));
+ MutableList<Product2<String, String>> data = new MutableList<>();
+ data.appendElem(new Tuple2<>("testKey1", "testValue1"));
+ data.appendElem(new Tuple2<>("testKey2", "testValue2"));
+ data.appendElem(new Tuple2<>("testKey3", "testValue3"));
+ data.appendElem(new Tuple2<>("testKey4", "testValue4"));
+ data.appendElem(new Tuple2<>("testKey5", "testValue5"));
+ data.appendElem(new Tuple2<>("testKey6", "testValue6"));
rssShuffleWriterSpy.write(data.iterator());
assertTrue(rssShuffleWriterSpy.getShuffleWriteMetrics().shuffleWriteTime()
> 0);
@@ -255,7 +255,7 @@ public class RssShuffleWriterTest {
public void postBlockEventTest() throws Exception {
final WriteBufferManager mockBufferManager =
mock(WriteBufferManager.class);
final ShuffleWriteMetrics mockMetrics = mock(ShuffleWriteMetrics.class);
- ShuffleDependency mockDependency = mock(ShuffleDependency.class);
+ ShuffleDependency<String, String, String> mockDependency =
mock(ShuffleDependency.class);
Partitioner mockPartitioner = mock(Partitioner.class);
final RssShuffleManager mockShuffleManager = mock(RssShuffleManager.class);
when(mockDependency.partitioner()).thenReturn(mockPartitioner);
@@ -275,14 +275,14 @@ public class RssShuffleWriterTest {
eventLoop.start();
when(mockShuffleManager.getEventLoop()).thenReturn(eventLoop);
- RssShuffleHandle mockHandle = mock(RssShuffleHandle.class);
+ RssShuffleHandle<String, String, String> mockHandle =
mock(RssShuffleHandle.class);
when(mockHandle.getDependency()).thenReturn(mockDependency);
ShuffleWriteClient mockWriteClient = mock(ShuffleWriteClient.class);
SparkConf conf = new SparkConf();
conf.set(RssSparkConfig.RSS_CLIENT_SEND_SIZE_LIMIT.key(), "64")
.set(RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.LOCALFILE.name());
- RssShuffleWriter writer = new RssShuffleWriter("appId", 0, "taskId", 1L,
+ RssShuffleWriter<String, String, String> writer = new
RssShuffleWriter<>("appId", 0, "taskId", 1L,
mockBufferManager, mockMetrics, mockShuffleManager, conf,
mockWriteClient, mockHandle);
List<ShuffleBlockInfo> shuffleBlockInfoList = createShuffleBlockList(1,
31);
writer.postBlockEvent(shuffleBlockInfoList);
diff --git
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/TestUtils.java
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/TestUtils.java
index 99619f16..033966a7 100644
--- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/TestUtils.java
+++ b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/TestUtils.java
@@ -22,6 +22,7 @@ import java.util.Set;
import org.apache.commons.lang3.SystemUtils;
import org.apache.spark.SparkConf;
+import org.apache.spark.shuffle.writer.AddBlockEvent;
import org.apache.spark.util.EventLoop;
public class TestUtils {
@@ -32,7 +33,7 @@ public class TestUtils {
public static RssShuffleManager createShuffleManager(
SparkConf conf,
Boolean isDriver,
- EventLoop loop,
+ EventLoop<AddBlockEvent> loop,
Map<String, Set<Long>> successBlockIds,
Map<String, Set<Long>> failBlockIds) {
return new RssShuffleManager(conf, isDriver, loop, successBlockIds,
failBlockIds);
diff --git
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
index c0988e2e..a77ce697 100644
---
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
+++
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
@@ -66,8 +66,8 @@ public class RssShuffleReaderTest extends
AbstractRssReaderTest {
writeTestData(writeHandler, 2, 5, expectedData,
blockIdBitmap, "key", KRYO_SERIALIZER, 0);
- RssShuffleHandle handleMock = mock(RssShuffleHandle.class);
- ShuffleDependency dependencyMock = mock(ShuffleDependency.class);
+ RssShuffleHandle<String, String, String> handleMock =
mock(RssShuffleHandle.class);
+ ShuffleDependency<String, String, String> dependencyMock =
mock(ShuffleDependency.class);
when(handleMock.getAppId()).thenReturn("appId");
when(handleMock.getDependency()).thenReturn(dependencyMock);
when(handleMock.getShuffleId()).thenReturn(1);
@@ -87,7 +87,7 @@ public class RssShuffleReaderTest extends
AbstractRssReaderTest {
Map<Integer, Roaring64NavigableMap> partitionToExpectBlocks =
Maps.newHashMap();
partitionToExpectBlocks.put(0, blockIdBitmap);
- RssShuffleReader rssShuffleReaderSpy = spy(new RssShuffleReader<String,
String>(
+ RssShuffleReader<String, String> rssShuffleReaderSpy = spy(new
RssShuffleReader<>(
0,
1,
0,
@@ -111,7 +111,7 @@ public class RssShuffleReaderTest extends
AbstractRssReaderTest {
writeTestData(writeHandler1, 2, 4, expectedData,
blockIdBitmap1, "another_key", KRYO_SERIALIZER, 1);
partitionToExpectBlocks.put(1, blockIdBitmap1);
- RssShuffleReader rssShuffleReaderSpy1 = spy(new RssShuffleReader<String,
String>(
+ RssShuffleReader<String, String> rssShuffleReaderSpy1 = spy(new
RssShuffleReader<>(
0,
2,
0,
@@ -132,7 +132,7 @@ public class RssShuffleReaderTest extends
AbstractRssReaderTest {
));
validateResult(rssShuffleReaderSpy1.read(), expectedData, 18);
- RssShuffleReader rssShuffleReaderSpy2 = spy(new RssShuffleReader<String,
String>(
+ RssShuffleReader<String, String> rssShuffleReaderSpy2 = spy(new
RssShuffleReader<>(
0,
2,
0,
diff --git
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index bb92f079..6fe8a45f 100644
---
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -90,8 +90,8 @@ public class RssShuffleWriterTest {
ShuffleWriteClient mockShuffleWriteClient = mock(ShuffleWriteClient.class);
Partitioner mockPartitioner = mock(Partitioner.class);
- RssShuffleHandle mockHandle = mock(RssShuffleHandle.class);
- ShuffleDependency mockDependency = mock(ShuffleDependency.class);
+ RssShuffleHandle<String, String, String> mockHandle =
mock(RssShuffleHandle.class);
+ ShuffleDependency<String, String, String> mockDependency =
mock(ShuffleDependency.class);
when(mockHandle.getDependency()).thenReturn(mockDependency);
when(mockPartitioner.numPartitions()).thenReturn(2);
TaskMemoryManager mockTaskMemoryManager = mock(TaskMemoryManager.class);
@@ -104,7 +104,7 @@ public class RssShuffleWriterTest {
Maps.newHashMap(), mockTaskMemoryManager, new ShuffleWriteMetrics(),
new RssConf());
WriteBufferManager bufferManagerSpy = spy(bufferManager);
- RssShuffleWriter rssShuffleWriter = new RssShuffleWriter("appId", 0,
"taskId", 1L,
+ RssShuffleWriter<String, String, String> rssShuffleWriter = new
RssShuffleWriter<>("appId", 0, "taskId", 1L,
bufferManagerSpy, (new TaskMetrics()).shuffleWriteMetrics(),
manager, conf, mockShuffleWriteClient, mockHandle);
doReturn(1000000L).when(bufferManagerSpy).acquireMemory(anyLong());
@@ -176,8 +176,8 @@ public class RssShuffleWriterTest {
Serializer kryoSerializer = new KryoSerializer(conf);
Partitioner mockPartitioner = mock(Partitioner.class);
final ShuffleWriteClient mockShuffleWriteClient =
mock(ShuffleWriteClient.class);
- ShuffleDependency mockDependency = mock(ShuffleDependency.class);
- RssShuffleHandle mockHandle = mock(RssShuffleHandle.class);
+ ShuffleDependency<String, String, String> mockDependency =
mock(ShuffleDependency.class);
+ RssShuffleHandle<String, String, String> mockHandle =
mock(RssShuffleHandle.class);
when(mockHandle.getDependency()).thenReturn(mockDependency);
when(mockDependency.serializer()).thenReturn(kryoSerializer);
when(mockDependency.partitioner()).thenReturn(mockPartitioner);
@@ -211,7 +211,7 @@ public class RssShuffleWriterTest {
0, 0, bufferOptions, kryoSerializer,
partitionToServers, mockTaskMemoryManager, shuffleWriteMetrics, new
RssConf());
WriteBufferManager bufferManagerSpy = spy(bufferManager);
- RssShuffleWriter rssShuffleWriter = new RssShuffleWriter("appId", 0,
"taskId", 1L,
+ RssShuffleWriter<String, String, String> rssShuffleWriter = new
RssShuffleWriter<>("appId", 0, "taskId", 1L,
bufferManagerSpy, shuffleWriteMetrics, manager, conf,
mockShuffleWriteClient, mockHandle);
doReturn(1000000L).when(bufferManagerSpy).acquireMemory(anyLong());
@@ -220,13 +220,13 @@ public class RssShuffleWriterTest {
doNothing().when(rssShuffleWriterSpy).sendCommit();
// case 1
- MutableList<Product2<String, String>> data = new MutableList();
- data.appendElem(new Tuple2("testKey2", "testValue2"));
- data.appendElem(new Tuple2("testKey3", "testValue3"));
- data.appendElem(new Tuple2("testKey4", "testValue4"));
- data.appendElem(new Tuple2("testKey6", "testValue6"));
- data.appendElem(new Tuple2("testKey1", "testValue1"));
- data.appendElem(new Tuple2("testKey5", "testValue5"));
+ MutableList<Product2<String, String>> data = new MutableList<>();
+ data.appendElem(new Tuple2<>("testKey2", "testValue2"));
+ data.appendElem(new Tuple2<>("testKey3", "testValue3"));
+ data.appendElem(new Tuple2<>("testKey4", "testValue4"));
+ data.appendElem(new Tuple2<>("testKey6", "testValue6"));
+ data.appendElem(new Tuple2<>("testKey1", "testValue1"));
+ data.appendElem(new Tuple2<>("testKey5", "testValue5"));
rssShuffleWriterSpy.write(data.iterator());
assertTrue(shuffleWriteMetrics.writeTime() > 0);
@@ -265,7 +265,7 @@ public class RssShuffleWriterTest {
@Test
public void postBlockEventTest() throws Exception {
WriteBufferManager mockBufferManager = mock(WriteBufferManager.class);
- ShuffleDependency mockDependency = mock(ShuffleDependency.class);
+ ShuffleDependency<String, String, String> mockDependency =
mock(ShuffleDependency.class);
ShuffleWriteMetrics mockMetrics = mock(ShuffleWriteMetrics.class);
Partitioner mockPartitioner = mock(Partitioner.class);
when(mockDependency.partitioner()).thenReturn(mockPartitioner);
@@ -290,14 +290,14 @@ public class RssShuffleWriterTest {
Maps.newConcurrentMap(),
Maps.newConcurrentMap()));
- RssShuffleHandle mockHandle = mock(RssShuffleHandle.class);
+ RssShuffleHandle<String, String, String> mockHandle =
mock(RssShuffleHandle.class);
when(mockHandle.getDependency()).thenReturn(mockDependency);
ShuffleWriteClient mockWriteClient = mock(ShuffleWriteClient.class);
SparkConf conf = new SparkConf();
conf.set(RssSparkConfig.RSS_CLIENT_SEND_SIZE_LIMIT.key(), "64")
.set(RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.MEMORY_LOCALFILE.name());
List<ShuffleBlockInfo> shuffleBlockInfoList = createShuffleBlockList(1,
31);
- RssShuffleWriter writer = new RssShuffleWriter("appId", 0, "taskId", 1L,
+ RssShuffleWriter<String, String, String> writer = new
RssShuffleWriter<>("appId", 0, "taskId", 1L,
mockBufferManager, mockMetrics, mockShuffleManager, conf,
mockWriteClient, mockHandle);
writer.postBlockEvent(shuffleBlockInfoList);
Thread.sleep(500);
@@ -324,7 +324,7 @@ public class RssShuffleWriterTest {
private void testTwoEvents(
List<AddBlockEvent> events,
- RssShuffleWriter writer,
+ RssShuffleWriter<String, String, String> writer,
int blockNum,
int blockLength,
int firstEventSize,
@@ -341,7 +341,7 @@ public class RssShuffleWriterTest {
private void testSingleEvent(
List<AddBlockEvent> events,
- RssShuffleWriter writer,
+ RssShuffleWriter<String, String, String> writer,
int blockNum,
int blockLength) throws InterruptedException {
List<ShuffleBlockInfo> shuffleBlockInfoList;
diff --git
a/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
b/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
index b93ab23b..418cb7f5 100644
---
a/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
+++
b/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
@@ -382,6 +382,7 @@ public class LocalOrderSegmentSplitterTest {
assertEquals(1, dataSegments.get(1).getLength());
}
+ @SafeVarargs
public static byte[] generateData(Pair<Integer, Integer>... configEntries) {
ByteBuffer byteBuffer = ByteBuffer.allocate(configEntries.length * 40);
int total = 0;
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/CombineByKeyTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/CombineByKeyTest.java
index dfacfa9c..64e4ec6a 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/CombineByKeyTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/CombineByKeyTest.java
@@ -33,7 +33,7 @@ public class CombineByKeyTest extends SimpleTestBase {
}
@Override
- public Map runTest(SparkSession spark, String fileName) throws Exception {
+ public Map<String, Tuple2<Integer, Integer>> runTest(SparkSession spark,
String fileName) throws Exception {
// take a rest to make sure shuffle server is registered
Thread.sleep(4000);
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/TestUtils.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/TestUtils.java
index 3d4a5056..929c2337 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/TestUtils.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/TestUtils.java
@@ -53,7 +53,7 @@ public class TestUtils {
return javaPairRDD1;
}
- static JavaPairRDD<String, Tuple2<Integer, Integer>>
combineByKeyRDD(JavaPairRDD javaPairRDD1) {
+ static JavaPairRDD<String, Tuple2<Integer, Integer>>
combineByKeyRDD(JavaPairRDD<String, Integer> javaPairRDD1) {
JavaPairRDD<String, Tuple2<Integer, Integer>> javaPairRDD = javaPairRDD1
.combineByKey((Function<Integer, Tuple2<Integer, Integer>>) i -> new
Tuple2<>(1, i),
(Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer,
Integer>>)
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/WriteAndReadMetricsTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/WriteAndReadMetricsTest.java
index b38b9fb3..a24204d5 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/WriteAndReadMetricsTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/WriteAndReadMetricsTest.java
@@ -40,7 +40,7 @@ public class WriteAndReadMetricsTest extends SimpleTestBase {
}
@Override
- public Map runTest(SparkSession spark, String fileName) throws Exception {
+ public Map<String, Long> runTest(SparkSession spark, String fileName) throws
Exception {
// take a rest to make sure shuffle server is registered
Thread.sleep(3000);
@@ -49,9 +49,9 @@ public class WriteAndReadMetricsTest extends SimpleTestBase {
.otherwise(functions.col("id")).as("key1"),
functions.col("id").as("value1"));
df1.createOrReplaceTempView("table1");
- List list = spark.sql("select count(value1) from table1 group by
key1").collectAsList();
+ List<?> list = spark.sql("select count(value1) from table1 group by
key1").collectAsList();
Map<String, Long> result = new HashMap<>();
- result.put("size", Long.valueOf(list.size()));
+ result.put("size", (long) list.size());
for (int stageId :
spark.sparkContext().statusTracker().getJobInfo(0).get().stageIds()) {
long writeRecords = getFirstStageData(spark,
stageId).shuffleWriteRecords();
diff --git
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
index 7b9590e5..a482e0df 100644
---
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
+++
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
@@ -103,8 +103,9 @@ public class GetReaderTest extends IntegrationTestBase {
SparkSession sparkSession =
SparkSession.builder().config(sparkConf).getOrCreate();
JavaSparkContext jsc1 = new JavaSparkContext(sparkSession.sparkContext());
JavaPairRDD<String, Tuple2<Integer, Integer>> javaPairRDD1 =
TestUtils.combineByKeyRDD(TestUtils.getRDD(jsc1));
- ShuffleDependency shuffleDependency1 = (ShuffleDependency)
javaPairRDD1.rdd().dependencies().head();
- RssShuffleHandle rssShuffleHandle1 = (RssShuffleHandle)
shuffleDependency1.shuffleHandle();
+ ShuffleDependency<?, ?, ?> shuffleDependency1 =
+ (ShuffleDependency<?, ?, ?>) javaPairRDD1.rdd().dependencies().head();
+ RssShuffleHandle<?, ?, ?> rssShuffleHandle1 = (RssShuffleHandle<?, ?, ?>)
shuffleDependency1.shuffleHandle();
RemoteStorageInfo remoteStorageInfo1 =
rssShuffleHandle1.getRemoteStorage();
assertEquals(remoteStorage1, remoteStorageInfo1.getPath());
assertTrue(remoteStorageInfo1.getConfItems().isEmpty());
@@ -112,23 +113,26 @@ public class GetReaderTest extends IntegrationTestBase {
// emptyRDD case
JavaPairRDD<String, Tuple2<Integer, Integer>> javaEmptyPairRDD1 =
TestUtils.combineByKeyRDD(
TestUtils.getEmptyRDD(jsc1));
- ShuffleDependency emptyShuffleDependency1 = (ShuffleDependency)
javaEmptyPairRDD1.rdd().dependencies().head();
- RssShuffleHandle emptyRssShuffleHandle1 = (RssShuffleHandle)
emptyShuffleDependency1.shuffleHandle();
+ ShuffleDependency<?, ?, ?> emptyShuffleDependency1 =
+ (ShuffleDependency<?, ?, ?>)
javaEmptyPairRDD1.rdd().dependencies().head();
+ RssShuffleHandle<?, ?, ?> emptyRssShuffleHandle1 =
+ (RssShuffleHandle<?, ?, ?>) emptyShuffleDependency1.shuffleHandle();
assertEquals(javaEmptyPairRDD1.rdd().dependencies().head().rdd().getNumPartitions(),
0);
assertEquals(emptyRssShuffleHandle1.getPartitionToServers(),
Collections.emptyMap());
assertEquals(emptyRssShuffleHandle1.getRemoteStorage(),RemoteStorageInfo.EMPTY_REMOTE_STORAGE);
// the same app would get the same storage info
JavaPairRDD<String, Tuple2<Integer, Integer>> javaPairRDD2 =
TestUtils.combineByKeyRDD(TestUtils.getRDD(jsc1));
- ShuffleDependency shuffleDependency2 = (ShuffleDependency)
javaPairRDD2.rdd().dependencies().head();
- RssShuffleHandle rssShuffleHandle2 = (RssShuffleHandle)
shuffleDependency2.shuffleHandle();
+ ShuffleDependency<?, ?, ?> shuffleDependency2 =
+ (ShuffleDependency<?, ?, ?>) javaPairRDD2.rdd().dependencies().head();
+ RssShuffleHandle<?, ?, ?> rssShuffleHandle2 = (RssShuffleHandle<?, ?, ?>)
shuffleDependency2.shuffleHandle();
RemoteStorageInfo remoteStorageInfo2 =
rssShuffleHandle2.getRemoteStorage();
assertEquals(remoteStorage1, remoteStorageInfo1.getPath());
assertTrue(remoteStorageInfo2.getConfItems().isEmpty());
RssShuffleManager rssShuffleManager = (RssShuffleManager)
sparkSession.sparkContext().env().shuffleManager();
- RssShuffleHandle rssShuffleHandle = (RssShuffleHandle)
shuffleDependency2.shuffleHandle();
- RssShuffleReader rssShuffleReader = (RssShuffleReader)
rssShuffleManager.getReader(
+ RssShuffleHandle<?, ?, ?> rssShuffleHandle = (RssShuffleHandle<?, ?, ?>)
shuffleDependency2.shuffleHandle();
+ RssShuffleReader<?, ?> rssShuffleReader = (RssShuffleReader<?, ?>)
rssShuffleManager.getReader(
rssShuffleHandle, 0, 0, new MockTaskContext(), new
TempShuffleReadMetrics());
Configuration hadoopConf = rssShuffleReader.getHadoopConf();
assertNull(hadoopConf.get("k1"));
@@ -141,8 +145,8 @@ public class GetReaderTest extends IntegrationTestBase {
rssShuffleManager.setAppId("test2");
JavaSparkContext jsc2 = new JavaSparkContext(sparkSession.sparkContext());
JavaPairRDD<String, Tuple2<Integer, Integer>> javaPairRDD =
TestUtils.combineByKeyRDD(TestUtils.getRDD(jsc2));
- ShuffleDependency shuffleDependency = (ShuffleDependency)
javaPairRDD.rdd().dependencies().head();
- rssShuffleHandle = (RssShuffleHandle) shuffleDependency.shuffleHandle();
+ ShuffleDependency<?, ?, ?> shuffleDependency = (ShuffleDependency<?, ?,
?>) javaPairRDD.rdd().dependencies().head();
+ rssShuffleHandle = (RssShuffleHandle<?, ?, ?>)
shuffleDependency.shuffleHandle();
// the reason for sleep here is to ensure that threads can be scheduled
normally
Thread.sleep(500);
RemoteStorageInfo remoteStorageInfo3 = rssShuffleHandle.getRemoteStorage();
@@ -151,7 +155,7 @@ public class GetReaderTest extends IntegrationTestBase {
assertEquals("v1", remoteStorageInfo3.getConfItems().get("k1"));
assertEquals("v2", remoteStorageInfo3.getConfItems().get("k2"));
- rssShuffleReader = (RssShuffleReader) rssShuffleManager.getReader(
+ rssShuffleReader = (RssShuffleReader<?, ?>) rssShuffleManager.getReader(
rssShuffleHandle, 0, 0, new MockTaskContext(), new
TempShuffleReadMetrics());
hadoopConf = rssShuffleReader.getHadoopConf();
assertEquals("v1", hadoopConf.get("k1"));
@@ -161,7 +165,7 @@ public class GetReaderTest extends IntegrationTestBase {
assertNull(commonHadoopConf.get("k1"));
assertNull(commonHadoopConf.get("k2"));
- rssShuffleReader = (RssShuffleReader) rssShuffleManager.getReader(
+ rssShuffleReader = (RssShuffleReader<?, ?>) rssShuffleManager.getReader(
rssShuffleHandle, 0, 0, new MockTaskContext(), new
TempShuffleReadMetrics());
hadoopConf = rssShuffleReader.getHadoopConf();
assertEquals("v1", hadoopConf.get("k1"));
diff --git
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
index ec7a7c93..6a7553ae 100644
---
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
+++
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
@@ -142,7 +142,7 @@ public class GetShuffleReportForMultiPartTest extends
SparkIntegrationTestBase {
}
@Override
- Map runTest(SparkSession spark, String fileName) throws Exception {
+ Map<Integer, String> runTest(SparkSession spark, String fileName) throws
Exception {
Thread.sleep(4000);
Map<Integer, String> map = Maps.newHashMap();
Dataset<Row> df2 = spark.range(0, 1000, 1, 10)
@@ -218,7 +218,7 @@ public class GetShuffleReportForMultiPartTest extends
SparkIntegrationTestBase {
ShuffleReadMetricsReporter metrics,
Roaring64NavigableMap taskIdBitmap) {
int shuffleId = handle.shuffleId();
- RssShuffleHandle rssShuffleHandle = (RssShuffleHandle) handle;
+ RssShuffleHandle<?, ?, ?> rssShuffleHandle = (RssShuffleHandle<?, ?, ?>)
handle;
Map<Integer, List<ShuffleServerInfo>> allPartitionToServers =
rssShuffleHandle.getPartitionToServers();
int partitionNum = (int) allPartitionToServers.entrySet().stream()
.filter(x -> x.getKey() >= startPartition
&& x.getKey() < endPartition).count();