This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a0eec44 [ISSUE-154][Improvement] Support Empty assignment to Shuffle 
Server (#325)
1a0eec44 is described below

commit 1a0eec44fa2274b4c8bf70767434d043c5262ccc
Author: haorenhui <[email protected]>
AuthorDate: Wed Nov 16 15:02:02 2022 +0800

    [ISSUE-154][Improvement] Support Empty assignment to Shuffle Server (#325)
    
    ### What changes were proposed in this pull request?
    Follow issue#154, When the RDD is Empty, partitionNum is 0, return the 
empty RssShuffleHandle directly
    
    ### Why are the changes needed?
    If the requested shuffle Server is 0, there is no need to request a 
coodinator.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    UT
    
    Co-authored-by: haorenhui <[email protected]>
---
 .../java/org/apache/spark/shuffle/RssShuffleManager.java | 12 ++++++++++++
 .../java/org/apache/spark/shuffle/RssShuffleManager.java | 12 ++++++++++++
 .../src/test/java/org/apache/uniffle/test/TestUtils.java | 16 ++++++++++++++++
 .../test/java/org/apache/uniffle/test/GetReaderTest.java | 11 +++++++++++
 .../test/java/org/apache/uniffle/test/GetReaderTest.java | 10 ++++++++++
 5 files changed, 61 insertions(+)

diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index f195b91e..ecdbdaac 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.shuffle;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -212,6 +213,17 @@ public class RssShuffleManager implements ShuffleManager {
       LOG.info("Generate application id used in rss: " + appId);
     }
 
+    if (dependency.partitioner().numPartitions() == 0) {
+      LOG.info("RegisterShuffle with ShuffleId[" + shuffleId + "], 
partitionNum is 0, "
+          + "return the empty RssShuffleHandle directly");
+      return new RssShuffleHandle(shuffleId,
+        appId,
+        dependency.rdd().getNumPartitions(),
+        dependency,
+        Collections.emptyMap(),
+        RemoteStorageInfo.EMPTY_REMOTE_STORAGE);
+    }
+
     String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
     RemoteStorageInfo defaultRemoteStorage = new RemoteStorageInfo(
         sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), ""));
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index c1cdc1c0..36ecbc8c 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.shuffle;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -269,6 +270,17 @@ public class RssShuffleManager implements ShuffleManager {
     }
     LOG.info("Generate application id used in rss: " + id.get());
 
+    if (dependency.partitioner().numPartitions() == 0) {
+      LOG.info("RegisterShuffle with ShuffleId[" + shuffleId + "], 
partitionNum is 0, "
+          + "return the empty RssShuffleHandle directly");
+      return new RssShuffleHandle(shuffleId,
+        id.get(),
+        dependency.rdd().getNumPartitions(),
+        dependency,
+        Collections.emptyMap(),
+        RemoteStorageInfo.EMPTY_REMOTE_STORAGE);
+    }
+
     String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
     RemoteStorageInfo defaultRemoteStorage = new RemoteStorageInfo(
         sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), ""));
diff --git 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/TestUtils.java
 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/TestUtils.java
index b7bdbfc7..3d4a5056 100644
--- 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/TestUtils.java
+++ 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/TestUtils.java
@@ -17,11 +17,15 @@
 
 package org.apache.uniffle.test;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+
 import com.google.common.collect.Lists;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
 import scala.Tuple2;
 
 public class TestUtils {
@@ -29,6 +33,18 @@ public class TestUtils {
 
   }
 
+  static JavaPairRDD<String, Integer> getEmptyRDD(JavaSparkContext jsc) {
+    JavaPairRDD<String, Integer> javaPairRDD1 = jsc.emptyRDD().flatMapToPair(
+        new PairFlatMapFunction<Object, String, Integer>() {
+          @Override
+          public Iterator<Tuple2<String, Integer>> call(Object s) throws 
Exception {
+            return new ArrayList<Tuple2<String, Integer>>().iterator();
+          }
+        }
+    );
+    return javaPairRDD1;
+  }
+
   static JavaPairRDD<String, Integer> getRDD(JavaSparkContext jsc) {
     JavaPairRDD<String, Integer> javaPairRDD1 = 
jsc.parallelizePairs(Lists.newArrayList(
         new Tuple2<>("cat1", 11), new Tuple2<>("dog", 22),
diff --git 
a/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
 
b/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
index b273d47c..b83c6279 100644
--- 
a/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
+++ 
b/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.test;
 
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
+import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -97,6 +98,16 @@ public class GetReaderTest extends IntegrationTestBase {
     assertEquals(remoteStorage1, remoteStorageInfo1.getPath());
     assertTrue(remoteStorageInfo1.getConfItems().isEmpty());
 
+    // emptyRDD case
+    JavaPairRDD<String, Tuple2<Integer, Integer>> javaEmptyPairRDD1 = 
TestUtils.combineByKeyRDD(
+        TestUtils.getEmptyRDD(jsc1));
+    ShuffleDependency emptyShuffleDependency1 = (ShuffleDependency) 
javaEmptyPairRDD1.rdd().dependencies().head();
+    RssShuffleHandle emptyRssShuffleHandle1 = (RssShuffleHandle) 
emptyShuffleDependency1.shuffleHandle();
+    
assertEquals(javaEmptyPairRDD1.rdd().dependencies().head().rdd().getNumPartitions(),
 0);
+    assertEquals(emptyRssShuffleHandle1.getPartitionToServers(), 
Collections.emptyMap());
+    
assertEquals(emptyRssShuffleHandle1.getRemoteStorage(),RemoteStorageInfo.EMPTY_REMOTE_STORAGE);
+
+
     // the same app would get the same storage info
     JavaPairRDD<String, Tuple2<Integer, Integer>> javaPairRDD2 = 
TestUtils.combineByKeyRDD(TestUtils.getRDD(jsc1));
     ShuffleDependency shuffleDependency2 = (ShuffleDependency) 
javaPairRDD2.rdd().dependencies().head();
diff --git 
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
index 113cb4b7..0d1271d8 100644
--- 
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
+++ 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.test;
 
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
+import java.util.Collections;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -107,6 +108,15 @@ public class GetReaderTest extends IntegrationTestBase {
     assertEquals(remoteStorage1, remoteStorageInfo1.getPath());
     assertTrue(remoteStorageInfo1.getConfItems().isEmpty());
 
+    // emptyRDD case
+    JavaPairRDD<String, Tuple2<Integer, Integer>> javaEmptyPairRDD1 = 
TestUtils.combineByKeyRDD(
+        TestUtils.getEmptyRDD(jsc1));
+    ShuffleDependency emptyShuffleDependency1 = (ShuffleDependency) 
javaEmptyPairRDD1.rdd().dependencies().head();
+    RssShuffleHandle emptyRssShuffleHandle1 = (RssShuffleHandle) 
emptyShuffleDependency1.shuffleHandle();
+    
assertEquals(javaEmptyPairRDD1.rdd().dependencies().head().rdd().getNumPartitions(),
 0);
+    assertEquals(emptyRssShuffleHandle1.getPartitionToServers(), 
Collections.emptyMap());
+    
assertEquals(emptyRssShuffleHandle1.getRemoteStorage(),RemoteStorageInfo.EMPTY_REMOTE_STORAGE);
+
     // the same app would get the same storage info
     JavaPairRDD<String, Tuple2<Integer, Integer>> javaPairRDD2 = 
TestUtils.combineByKeyRDD(TestUtils.getRDD(jsc1));
     ShuffleDependency shuffleDependency2 = (ShuffleDependency) 
javaPairRDD2.rdd().dependencies().head();

Reply via email to