This is an automated email from the ASF dual-hosted git repository. jshao pushed a commit to branch branch-0.2.0 in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
commit 7f3c44a9a051310e991034162ef53e2835490e71 Author: roryqi <jerqi1242949...@gmail.com> AuthorDate: Tue Mar 1 20:33:34 2022 +0800 [Feature] [0.2] Support Spark 3.2 (#88) ### What changes were proposed in this pull request? Support Spark 3.2 ### Why are the changes needed? We need support more Spark Versions ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? GA passed (include profiles spark2,spark3,spark3.0,spark3.1,spark3.2) Co-authored-by: roryqi <ror...@tencent.com> --- README.md | 2 +- .../spark/shuffle/writer/WriteBufferManager.java | 3 +- .../spark/shuffle/writer/RssShuffleWriter.java | 5 + .../tencent/rss/test/SparkIntegrationTestBase.java | 4 + integration-test/spark3/pom.xml | 2 + pom.xml | 106 ++++++++++++++++++++- 6 files changed, 119 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index a785f47..ac3e92a 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ The shuffle data is stored with index file and data file. Data file has all bloc ![Rss Shuffle_Write](docs/asset/rss_data_format.png) ## Supported Spark Version -Current support Spark 2.3.x, Spark 2.4.x, Spark3.0.x, Spark 3.1.x +Current support Spark 2.3.x, Spark 2.4.x, Spark3.0.x, Spark 3.1.x, Spark 3.2.x Note: To support dynamic allocation, the patch(which is included in client-spark/patch folder) should be applied to Spark diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java index 1b26f0b..91cc6a7 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java @@ -28,6 +28,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.MemoryMode; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.serializer.SerializationStream; import org.apache.spark.serializer.Serializer; @@ -86,7 +87,7 @@ public class WriteBufferManager extends MemoryConsumer { Map<Integer, List<ShuffleServerInfo>> partitionToServers, TaskMemoryManager taskMemoryManager, ShuffleWriteMetrics shuffleWriteMetrics) { - super(taskMemoryManager); + super(taskMemoryManager, taskMemoryManager.pageSizeBytes(), MemoryMode.ON_HEAP); this.bufferSize = bufferManagerOptions.getBufferSize(); this.spillSize = bufferManagerOptions.getBufferSpillThreshold(); this.instance = serializer.newInstance(); diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java index 2a4beb6..a7e4480 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java @@ -171,6 +171,11 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> { + bufferManager.getManagerCostInfo()); } + // only push-based shuffle use this interface, but rss won't be used when push-based shuffle is enabled. + public long[] getPartitionLengths() { + return new long[0]; + } + private void processShuffleBlockInfos(List<ShuffleBlockInfo> shuffleBlockInfoList, Set<Long> blockIds) { if (shuffleBlockInfoList != null && !shuffleBlockInfoList.isEmpty()) { shuffleBlockInfoList.forEach(sbi -> { diff --git a/integration-test/spark-common/src/test/java/com/tencent/rss/test/SparkIntegrationTestBase.java b/integration-test/spark-common/src/test/java/com/tencent/rss/test/SparkIntegrationTestBase.java index 06789d2..1e15ba6 100644 --- a/integration-test/spark-common/src/test/java/com/tencent/rss/test/SparkIntegrationTestBase.java +++ b/integration-test/spark-common/src/test/java/com/tencent/rss/test/SparkIntegrationTestBase.java @@ -21,6 +21,9 @@ package com.tencent.rss.test; import static org.junit.Assert.assertEquals; import java.util.Map; +import java.util.concurrent.TimeUnit; + +import com.google.common.util.concurrent.Uninterruptibles; import org.apache.spark.SparkConf; import org.apache.spark.shuffle.RssClientConfig; import org.apache.spark.sql.SparkSession; @@ -50,6 +53,7 @@ public abstract class SparkIntegrationTestBase extends IntegrationTestBase { Map resultWithoutRss = runSparkApp(sparkConf, fileName); long durationWithoutRss = System.currentTimeMillis() - start; + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); updateSparkConfWithRss(sparkConf); updateSparkConfCustomer(sparkConf); start = System.currentTimeMillis(); diff --git a/integration-test/spark3/pom.xml b/integration-test/spark3/pom.xml index d4d024a..5dc2f39 100644 --- a/integration-test/spark3/pom.xml +++ b/integration-test/spark3/pom.xml @@ -108,11 +108,13 @@ <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> + <version>${jackson.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> + <version>${jackson.version}</version> <scope>test</scope> </dependency> diff --git a/pom.xml b/pom.xml index 628c789..1cc35f3 100644 --- a/pom.xml +++ b/pom.xml @@ -53,7 +53,7 @@ <maven.compiler.target>${java.version}</maven.compiler.target> <metrics.version>3.1.0</metrics.version> <mockito.inline.version>3.5.15</mockito.inline.version> - <netty.version>4.1.47.Final</netty.version> + <netty.version>4.1.68.Final</netty.version> <picocli.version>4.5.2</picocli.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <prometheus.simpleclient.version>0.9.0</prometheus.simpleclient.version> @@ -1010,6 +1010,7 @@ <spark.version>2.3.4</spark.version> <client.type>2</client.type> <jackson.version>2.9.0</jackson.version> + <netty.version>4.1.47.Final</netty.version> </properties> <modules> <module>client-spark/common</module> @@ -1018,6 +1019,11 @@ </modules> <dependencyManagement> <dependencies> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + <version>${netty.version}</version> + </dependency> <dependency> <groupId>com.tencent.rss</groupId> <artifactId>rss-client-spark2</artifactId> @@ -1094,6 +1100,104 @@ <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${spark.version}</version> + <exclusions> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.tencent.rss</groupId> + <artifactId>rss-client-spark-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.tencent.rss</groupId> + <artifactId>rss-client-spark-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>com.tencent.rss</groupId> + <artifactId>rss-integration-common-test</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.tencent.rss</groupId> + <artifactId>rss-integration-spark-common-test</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> + </dependencyManagement> + </profile> + + <profile> + <id>spark3.2</id> + <properties> + <scala.binary.version>2.12</scala.binary.version> + <spark.version>3.2.1</spark.version> + <client.type>3</client.type> + <jackson.version>2.12.0</jackson.version> + </properties> + <modules> + <module>client-spark/common</module> + <module>client-spark/spark3</module> + <module>integration-test/spark-common</module> + <module>integration-test/spark3</module> + </modules> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>com.tencent.rss</groupId> + <artifactId>rss-client-spark3</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>com.tencent.rss</groupId>