This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new a8d20f2a [#635] feat(client): enable LOCAL_ORDER by default for Spark 
AQE  (#644)
a8d20f2a is described below

commit a8d20f2a30bab1d1a6d0df37f87e8998ab87d28d
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Feb 23 09:50:17 2023 +0800

    [#635] feat(client): enable LOCAL_ORDER by default for Spark AQE  (#644)
    
    ### What changes were proposed in this pull request?
    enable LOCAL_ORDER by default for Spark AQE
    
    ### Why are the changes needed?
    Currently, the local_order data distribution type should be activated 
explicitly. It's better to enable it when using AQE by default.
    Fix: #635
    
    ### Does this PR introduce _any_ user-facing change?
    Yes
    
    ### How was this patch tested?
    1. Existing UTs
---
 client-spark/spark3/pom.xml                        |  6 ++
 .../apache/spark/shuffle/RssShuffleManager.java    | 15 +++-
 .../spark/shuffle/RssShuffleManagerTest.java       | 86 ++++++++++++++++++++++
 docs/client_guide.md                               |  9 +--
 4 files changed, 108 insertions(+), 8 deletions(-)

diff --git a/client-spark/spark3/pom.xml b/client-spark/spark3/pom.xml
index 5860d6c8..38147246 100644
--- a/client-spark/spark3/pom.xml
+++ b/client-spark/spark3/pom.xml
@@ -79,6 +79,12 @@
             <artifactId>hadoop-minicluster</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index a37453ee..e08607ec 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -49,6 +49,7 @@ import org.apache.spark.shuffle.writer.AddBlockEvent;
 import org.apache.spark.shuffle.writer.BufferManagerOptions;
 import org.apache.spark.shuffle.writer.RssShuffleWriter;
 import org.apache.spark.shuffle.writer.WriteBufferManager;
+import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.storage.BlockId;
 import org.apache.spark.storage.BlockManagerId;
 import org.apache.spark.util.EventLoop;
@@ -71,6 +72,7 @@ import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssClientConf;
+import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.RetryUtils;
 import org.apache.uniffle.common.util.RssUtils;
@@ -177,7 +179,7 @@ public class RssShuffleManager implements ShuffleManager {
     final int retryMax = sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX);
     this.clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
     this.dynamicConfEnabled = 
sparkConf.get(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED);
-    this.dataDistributionType = 
RssSparkConfig.toRssConf(sparkConf).get(RssClientConf.DATA_DISTRIBUTION_TYPE);
+    this.dataDistributionType = getDataDistributionType(sparkConf);
     long retryIntervalMax = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX);
     int heartBeatThreadNum = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM);
     this.dataTransferPoolSize = 
sparkConf.get(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE);
@@ -220,6 +222,17 @@ public class RssShuffleManager implements ShuffleManager {
     }
   }
 
+  @VisibleForTesting
+  protected static ShuffleDataDistributionType 
getDataDistributionType(SparkConf sparkConf) {
+    RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+    if ((boolean) sparkConf.get(SQLConf.ADAPTIVE_EXECUTION_ENABLED())
+        && !rssConf.containsKey(RssClientConf.DATA_DISTRIBUTION_TYPE.key())) {
+      return ShuffleDataDistributionType.LOCAL_ORDER;
+    }
+
+    return rssConf.get(RssClientConf.DATA_DISTRIBUTION_TYPE);
+  }
+
   // For testing only
   @VisibleForTesting
   RssShuffleManager(
diff --git 
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTest.java
 
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTest.java
new file mode 100644
index 00000000..00bc591c
--- /dev/null
+++ 
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.internal.SQLConf;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.config.RssClientConf;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class RssShuffleManagerTest {
+  private static final String SPARK_ADAPTIVE_EXECUTION_ENABLED_KEY = 
"spark.sql.adaptive.enabled";
+
+  @Test
+  public void testGetDataDistributionType() {
+    // case1
+    SparkConf sparkConf = new SparkConf();
+    sparkConf.set(SPARK_ADAPTIVE_EXECUTION_ENABLED_KEY, "true");
+    assertEquals(
+        ShuffleDataDistributionType.LOCAL_ORDER,
+        RssShuffleManager.getDataDistributionType(sparkConf)
+    );
+
+    // case2
+    sparkConf = new SparkConf();
+    sparkConf.set(SPARK_ADAPTIVE_EXECUTION_ENABLED_KEY, "false");
+    assertEquals(
+        RssClientConf.DATA_DISTRIBUTION_TYPE.defaultValue(),
+        RssShuffleManager.getDataDistributionType(sparkConf)
+    );
+
+    // case3
+    sparkConf = new SparkConf();
+    sparkConf.set(SPARK_ADAPTIVE_EXECUTION_ENABLED_KEY, "true");
+    sparkConf.set("spark." + RssClientConf.DATA_DISTRIBUTION_TYPE.key(), 
ShuffleDataDistributionType.NORMAL.name());
+    assertEquals(
+        ShuffleDataDistributionType.NORMAL,
+        RssShuffleManager.getDataDistributionType(sparkConf)
+    );
+
+    // case4
+    sparkConf = new SparkConf();
+    sparkConf.set(SPARK_ADAPTIVE_EXECUTION_ENABLED_KEY, "true");
+    sparkConf.set(
+        "spark." + RssClientConf.DATA_DISTRIBUTION_TYPE.key(),
+        ShuffleDataDistributionType.LOCAL_ORDER.name()
+    );
+    assertEquals(
+        ShuffleDataDistributionType.LOCAL_ORDER,
+        RssShuffleManager.getDataDistributionType(sparkConf)
+    );
+
+    // case5
+    sparkConf = new SparkConf();
+    boolean aqeEnable = (boolean) 
sparkConf.get(SQLConf.ADAPTIVE_EXECUTION_ENABLED());
+    if (aqeEnable) {
+      assertEquals(
+          ShuffleDataDistributionType.LOCAL_ORDER,
+          RssShuffleManager.getDataDistributionType(sparkConf)
+      );
+    } else {
+      assertEquals(
+          RssClientConf.DATA_DISTRIBUTION_TYPE.defaultValue(),
+          RssShuffleManager.getDataDistributionType(sparkConf)
+      );
+    }
+  }
+}
diff --git a/docs/client_guide.md b/docs/client_guide.md
index 8b1b008e..413a5334 100644
--- a/docs/client_guide.md
+++ b/docs/client_guide.md
@@ -62,13 +62,8 @@ After apply the patch and rebuild spark, add following 
configuration in spark co
 To improve performance of AQE skew optimization, uniffle introduces the 
LOCAL_ORDER shuffle-data distribution mechanism 
 and Continuous partition assignment mechanism.
 
-1. LOCAL_ORDER shuffle-data distribution mechanism filter the lots of data to 
reduce network bandwidth and shuffle-server local-disk pressure.
-
-    It can be enabled by the following config
-      ```bash
-      # Default value is NORMAL, it will directly append to file when the 
memory data is flushed to external storage 
-      spark.rss.client.shuffle.data.distribution.type LOCAL_ORDER
-      ```
+1. LOCAL_ORDER shuffle-data distribution mechanism filter the lots of data to 
reduce network bandwidth and shuffle-server local-disk pressure. 
+   It will be enabled by default when AQE is enabled.
 
 2. Continuous partition assignment mechanism assign consecutive partitions to 
the same ShuffleServer to reduce the frequency of getShuffleResult.
 

Reply via email to