This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 1a0eec44 [ISSUE-154][Improvement] Support Empty assignment to Shuffle
Server (#325)
1a0eec44 is described below
commit 1a0eec44fa2274b4c8bf70767434d043c5262ccc
Author: haorenhui <[email protected]>
AuthorDate: Wed Nov 16 15:02:02 2022 +0800
[ISSUE-154][Improvement] Support Empty assignment to Shuffle Server (#325)
### What changes were proposed in this pull request?
Follow issue#154, When the RDD is Empty, partitionNum is 0, return the
empty RssShuffleHandle directly
### Why are the changes needed?
If the requested shuffle Server is 0, there is no need to request a
coodinator.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT
Co-authored-by: haorenhui <[email protected]>
---
.../java/org/apache/spark/shuffle/RssShuffleManager.java | 12 ++++++++++++
.../java/org/apache/spark/shuffle/RssShuffleManager.java | 12 ++++++++++++
.../src/test/java/org/apache/uniffle/test/TestUtils.java | 16 ++++++++++++++++
.../test/java/org/apache/uniffle/test/GetReaderTest.java | 11 +++++++++++
.../test/java/org/apache/uniffle/test/GetReaderTest.java | 10 ++++++++++
5 files changed, 61 insertions(+)
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index f195b91e..ecdbdaac 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -17,6 +17,7 @@
package org.apache.spark.shuffle;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -212,6 +213,17 @@ public class RssShuffleManager implements ShuffleManager {
LOG.info("Generate application id used in rss: " + appId);
}
+ if (dependency.partitioner().numPartitions() == 0) {
+ LOG.info("RegisterShuffle with ShuffleId[" + shuffleId + "],
partitionNum is 0, "
+ + "return the empty RssShuffleHandle directly");
+ return new RssShuffleHandle(shuffleId,
+ appId,
+ dependency.rdd().getNumPartitions(),
+ dependency,
+ Collections.emptyMap(),
+ RemoteStorageInfo.EMPTY_REMOTE_STORAGE);
+ }
+
String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
RemoteStorageInfo defaultRemoteStorage = new RemoteStorageInfo(
sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), ""));
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index c1cdc1c0..36ecbc8c 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -17,6 +17,7 @@
package org.apache.spark.shuffle;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -269,6 +270,17 @@ public class RssShuffleManager implements ShuffleManager {
}
LOG.info("Generate application id used in rss: " + id.get());
+ if (dependency.partitioner().numPartitions() == 0) {
+ LOG.info("RegisterShuffle with ShuffleId[" + shuffleId + "],
partitionNum is 0, "
+ + "return the empty RssShuffleHandle directly");
+ return new RssShuffleHandle(shuffleId,
+ id.get(),
+ dependency.rdd().getNumPartitions(),
+ dependency,
+ Collections.emptyMap(),
+ RemoteStorageInfo.EMPTY_REMOTE_STORAGE);
+ }
+
String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
RemoteStorageInfo defaultRemoteStorage = new RemoteStorageInfo(
sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), ""));
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/TestUtils.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/TestUtils.java
index b7bdbfc7..3d4a5056 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/TestUtils.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/TestUtils.java
@@ -17,11 +17,15 @@
package org.apache.uniffle.test;
+import java.util.ArrayList;
+import java.util.Iterator;
+
import com.google.common.collect.Lists;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;
public class TestUtils {
@@ -29,6 +33,18 @@ public class TestUtils {
}
+ static JavaPairRDD<String, Integer> getEmptyRDD(JavaSparkContext jsc) {
+ JavaPairRDD<String, Integer> javaPairRDD1 = jsc.emptyRDD().flatMapToPair(
+ new PairFlatMapFunction<Object, String, Integer>() {
+ @Override
+ public Iterator<Tuple2<String, Integer>> call(Object s) throws
Exception {
+ return new ArrayList<Tuple2<String, Integer>>().iterator();
+ }
+ }
+ );
+ return javaPairRDD1;
+ }
+
static JavaPairRDD<String, Integer> getRDD(JavaSparkContext jsc) {
JavaPairRDD<String, Integer> javaPairRDD1 =
jsc.parallelizePairs(Lists.newArrayList(
new Tuple2<>("cat1", 11), new Tuple2<>("dog", 22),
diff --git
a/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
b/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
index b273d47c..b83c6279 100644
---
a/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
+++
b/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.test;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
+import java.util.Collections;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.Uninterruptibles;
@@ -97,6 +98,16 @@ public class GetReaderTest extends IntegrationTestBase {
assertEquals(remoteStorage1, remoteStorageInfo1.getPath());
assertTrue(remoteStorageInfo1.getConfItems().isEmpty());
+ // emptyRDD case
+ JavaPairRDD<String, Tuple2<Integer, Integer>> javaEmptyPairRDD1 =
TestUtils.combineByKeyRDD(
+ TestUtils.getEmptyRDD(jsc1));
+ ShuffleDependency emptyShuffleDependency1 = (ShuffleDependency)
javaEmptyPairRDD1.rdd().dependencies().head();
+ RssShuffleHandle emptyRssShuffleHandle1 = (RssShuffleHandle)
emptyShuffleDependency1.shuffleHandle();
+
assertEquals(javaEmptyPairRDD1.rdd().dependencies().head().rdd().getNumPartitions(),
0);
+ assertEquals(emptyRssShuffleHandle1.getPartitionToServers(),
Collections.emptyMap());
+
assertEquals(emptyRssShuffleHandle1.getRemoteStorage(),RemoteStorageInfo.EMPTY_REMOTE_STORAGE);
+
+
// the same app would get the same storage info
JavaPairRDD<String, Tuple2<Integer, Integer>> javaPairRDD2 =
TestUtils.combineByKeyRDD(TestUtils.getRDD(jsc1));
ShuffleDependency shuffleDependency2 = (ShuffleDependency)
javaPairRDD2.rdd().dependencies().head();
diff --git
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
index 113cb4b7..0d1271d8 100644
---
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
+++
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.test;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
+import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@@ -107,6 +108,15 @@ public class GetReaderTest extends IntegrationTestBase {
assertEquals(remoteStorage1, remoteStorageInfo1.getPath());
assertTrue(remoteStorageInfo1.getConfItems().isEmpty());
+ // emptyRDD case
+ JavaPairRDD<String, Tuple2<Integer, Integer>> javaEmptyPairRDD1 =
TestUtils.combineByKeyRDD(
+ TestUtils.getEmptyRDD(jsc1));
+ ShuffleDependency emptyShuffleDependency1 = (ShuffleDependency)
javaEmptyPairRDD1.rdd().dependencies().head();
+ RssShuffleHandle emptyRssShuffleHandle1 = (RssShuffleHandle)
emptyShuffleDependency1.shuffleHandle();
+
assertEquals(javaEmptyPairRDD1.rdd().dependencies().head().rdd().getNumPartitions(),
0);
+ assertEquals(emptyRssShuffleHandle1.getPartitionToServers(),
Collections.emptyMap());
+
assertEquals(emptyRssShuffleHandle1.getRemoteStorage(),RemoteStorageInfo.EMPTY_REMOTE_STORAGE);
+
// the same app would get the same storage info
JavaPairRDD<String, Tuple2<Integer, Integer>> javaPairRDD2 =
TestUtils.combineByKeyRDD(TestUtils.getRDD(jsc1));
ShuffleDependency shuffleDependency2 = (ShuffleDependency)
javaPairRDD2.rdd().dependencies().head();