This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5321353b24db [SPARK-47875][CORE] Remove `spark.deploy.recoverySerializer` 5321353b24db is described below commit 5321353b24db247087890c44de06b9ad4e136473 Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Tue Apr 16 16:47:23 2024 -0700 [SPARK-47875][CORE] Remove `spark.deploy.recoverySerializer` ### What changes were proposed in this pull request? This is a logical revert of SPARK-46205 - #44113 - #44118 ### Why are the changes needed? The initial implementation didn't handle the class initialization logic properly. Until we have a fix, I'd like to revert this from `master` branch. ### Does this PR introduce _any_ user-facing change? No, this is not released yet. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46087 from dongjoon-hyun/SPARK-47875. Authored-by: Dongjoon Hyun <dh...@apple.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../PersistenceEngineBenchmark-jdk21-results.txt | 7 ------ .../PersistenceEngineBenchmark-results.txt | 7 ------ .../org/apache/spark/deploy/master/Master.scala | 7 ++---- .../org/apache/spark/internal/config/Deploy.scala | 14 ------------ .../deploy/master/PersistenceEngineBenchmark.scala | 4 ++-- .../deploy/master/PersistenceEngineSuite.scala | 14 +----------- .../apache/spark/deploy/master/RecoverySuite.scala | 25 ++-------------------- docs/spark-standalone.md | 12 ++--------- 8 files changed, 9 insertions(+), 81 deletions(-) diff --git a/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt b/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt index 2a6bd778fc8a..ae4e0071adb0 100644 --- a/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt +++ b/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt @@ -7,19 +7,12 @@ AMD EPYC 7763 64-Core Processor 1000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ---------------------------------------------------------------------------------------------------------------------------------------- ZooKeeperPersistenceEngine with JavaSerializer 5036 5232 229 0.0 5035730.1 1.0X -ZooKeeperPersistenceEngine with KryoSerializer 4038 4053 16 0.0 4038447.8 1.2X FileSystemPersistenceEngine with JavaSerializer 2902 2906 5 0.0 2902453.3 1.7X FileSystemPersistenceEngine with JavaSerializer (lz4) 816 829 19 0.0 816173.1 6.2X FileSystemPersistenceEngine with JavaSerializer (lzf) 755 780 33 0.0 755209.0 6.7X FileSystemPersistenceEngine with JavaSerializer (snappy) 814 832 16 0.0 813672.5 6.2X FileSystemPersistenceEngine with JavaSerializer (zstd) 987 1014 45 0.0 986834.7 5.1X -FileSystemPersistenceEngine with KryoSerializer 687 698 14 0.0 687313.5 7.3X -FileSystemPersistenceEngine with KryoSerializer (lz4) 590 599 15 0.0 589867.9 8.5X -FileSystemPersistenceEngine with KryoSerializer (lzf) 915 922 9 0.0 915432.2 5.5X -FileSystemPersistenceEngine with KryoSerializer (snappy) 768 795 37 0.0 768494.4 6.6X -FileSystemPersistenceEngine with KryoSerializer (zstd) 898 950 45 0.0 898118.6 5.6X RocksDBPersistenceEngine with JavaSerializer 299 299 0 0.0 298800.0 16.9X -RocksDBPersistenceEngine with KryoSerializer 112 113 1 0.0 111779.6 45.1X BlackHolePersistenceEngine 0 0 0 5.5 180.3 27924.2X diff --git a/core/benchmarks/PersistenceEngineBenchmark-results.txt b/core/benchmarks/PersistenceEngineBenchmark-results.txt index da1838608de1..ec9a6fc1c8cf 100644 --- a/core/benchmarks/PersistenceEngineBenchmark-results.txt +++ b/core/benchmarks/PersistenceEngineBenchmark-results.txt @@ -7,19 +7,12 @@ AMD EPYC 7763 64-Core Processor 1000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ---------------------------------------------------------------------------------------------------------------------------------------- ZooKeeperPersistenceEngine with JavaSerializer 5192 5309 116 0.0 5192160.2 1.0X -ZooKeeperPersistenceEngine with KryoSerializer 4056 4059 5 0.0 4055626.8 1.3X FileSystemPersistenceEngine with JavaSerializer 2926 2934 8 0.0 2926383.4 1.8X FileSystemPersistenceEngine with JavaSerializer (lz4) 820 827 11 0.0 820359.8 6.3X FileSystemPersistenceEngine with JavaSerializer (lzf) 772 781 9 0.0 772349.1 6.7X FileSystemPersistenceEngine with JavaSerializer (snappy) 802 812 10 0.0 801815.8 6.5X FileSystemPersistenceEngine with JavaSerializer (zstd) 972 994 31 0.0 972042.3 5.3X -FileSystemPersistenceEngine with KryoSerializer 708 726 15 0.0 707927.8 7.3X -FileSystemPersistenceEngine with KryoSerializer (lz4) 584 596 11 0.0 583999.8 8.9X -FileSystemPersistenceEngine with KryoSerializer (lzf) 880 896 14 0.0 880189.2 5.9X -FileSystemPersistenceEngine with KryoSerializer (snappy) 772 821 46 0.0 772130.1 6.7X -FileSystemPersistenceEngine with KryoSerializer (zstd) 906 928 29 0.0 905578.7 5.7X RocksDBPersistenceEngine with JavaSerializer 302 302 0 0.0 301664.5 17.2X -RocksDBPersistenceEngine with KryoSerializer 109 111 2 0.0 108979.5 47.6X BlackHolePersistenceEngine 0 0 0 6.3 158.3 32800.5X diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 283443425635..e02d45105727 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -42,7 +42,7 @@ import org.apache.spark.internal.config.Worker._ import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances} import org.apache.spark.resource.{ResourceInformation, ResourceProfile, ResourceRequirement, ResourceUtils} import org.apache.spark.rpc._ -import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, Serializer} +import org.apache.spark.serializer.{JavaSerializer, Serializer} import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils} import org.apache.spark.util.ArrayImplicits._ @@ -179,10 +179,7 @@ private[deploy] class Master( masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler) applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler) - val serializer = RecoverySerializer.withName(conf.get(RECOVERY_SERIALIZER)) match { - case RecoverySerializer.JAVA => new JavaSerializer(conf) - case RecoverySerializer.KRYO => new KryoSerializer(conf) - } + val serializer = new JavaSerializer(conf) val (persistenceEngine_, leaderElectionAgent_) = recoveryMode match { case "ZOOKEEPER" => logInfo("Persisting recovery state to ZooKeeper") diff --git a/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala b/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala index b09fbd7a5bb2..0c2db21905d1 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala @@ -26,20 +26,6 @@ private[spark] object Deploy { .stringConf .createWithDefault("NONE") - object RecoverySerializer extends Enumeration { - val JAVA, KRYO = Value - } - - val RECOVERY_SERIALIZER = ConfigBuilder("spark.deploy.recoverySerializer") - .doc("Serializer for writing/reading objects to/from persistence engines; " + - "JAVA or KRYO. Java serializer has been the default mode since Spark 0.8.1." + - "KRYO serializer is a new fast and compact mode from Spark 4.0.0.") - .version("4.0.0") - .stringConf - .transform(_.toUpperCase(Locale.ROOT)) - .checkValues(RecoverySerializer.values.map(_.toString)) - .createWithDefault(RecoverySerializer.JAVA.toString) - val RECOVERY_COMPRESSION_CODEC = ConfigBuilder("spark.deploy.recoveryCompressionCodec") .doc("A compression codec for persistence engines. none (default), lz4, lzf, snappy, and " + "zstd. Currently, only FILESYSTEM mode supports this configuration.") diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala index 34a447efe528..2a06ee5ed947 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala @@ -29,7 +29,7 @@ import org.apache.spark.deploy.{DeployTestUtils, DriverDescription} import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL import org.apache.spark.io.CompressionCodec import org.apache.spark.resource.ResourceUtils.{FPGA, GPU} -import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} +import org.apache.spark.serializer.JavaSerializer import org.apache.spark.util.Utils @@ -49,7 +49,7 @@ import org.apache.spark.util.Utils object PersistenceEngineBenchmark extends BenchmarkBase { val conf = new SparkConf() - val serializers = Seq(new JavaSerializer(conf), new KryoSerializer(conf)) + val serializers = Seq(new JavaSerializer(conf)) val zkTestServer = new TestingServer(findFreePort(conf)) override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala index 01b7e46eb2a8..6839afdeeff8 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL import org.apache.spark.io.CompressionCodec import org.apache.spark.rpc.{RpcEndpoint, RpcEnv} -import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, Serializer} +import org.apache.spark.serializer.{JavaSerializer, Serializer} import org.apache.spark.util.Utils class PersistenceEngineSuite extends SparkFunSuite { @@ -103,18 +103,6 @@ class PersistenceEngineSuite extends SparkFunSuite { } } - test("SPARK-46205: Support KryoSerializer in FileSystemPersistenceEngine") { - withTempDir { dir => - val conf = new SparkConf() - val serializer = new KryoSerializer(conf) - val engine = new FileSystemPersistenceEngine(dir.getAbsolutePath, serializer) - engine.persist("test_1", "test_1_value") - engine.read[String]("test_1") - engine.unpersist("test_1") - engine.close() - } - } - test("SPARK-46216: FileSystemPersistenceEngine with compression") { val conf = new SparkConf() CompressionCodec.ALL_COMPRESSION_CODECS.foreach { c => diff --git a/core/src/test/scala/org/apache/spark/deploy/master/RecoverySuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/RecoverySuite.scala index 5e2939738cdf..18b22e7352c9 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/RecoverySuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/RecoverySuite.scala @@ -38,7 +38,7 @@ import org.apache.spark.io.LZ4CompressionCodec import org.apache.spark.resource.{ResourceInformation, ResourceProfile, ResourceRequirement} import org.apache.spark.resource.ResourceUtils.{FPGA, GPU} import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv} -import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.serializer.JavaSerializer class RecoverySuite extends MasterSuiteBase { test("can use a custom recovery mode factory") { @@ -474,26 +474,6 @@ class RecoverySuite extends MasterSuiteBase { } } - test("SPARK-46205: Recovery with Kryo Serializer") { - val conf = new SparkConf(loadDefaults = false) - conf.set(RECOVERY_MODE, "FILESYSTEM") - conf.set(RECOVERY_SERIALIZER, "Kryo") - conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir")) - - var master: Master = null - try { - master = makeAliveMaster(conf) - val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[FileSystemPersistenceEngine] - assert(e.serializer.isInstanceOf[KryoSerializer]) - } finally { - if (master != null) { - master.rpcEnv.shutdown() - master.rpcEnv.awaitTermination() - master = null - } - } - } - test("SPARK-46216: Recovery without compression") { val conf = new SparkConf(loadDefaults = false) conf.set(RECOVERY_MODE, "FILESYSTEM") @@ -536,14 +516,13 @@ class RecoverySuite extends MasterSuiteBase { test("SPARK-46258: Recovery with RocksDB") { val conf = new SparkConf(loadDefaults = false) conf.set(RECOVERY_MODE, "ROCKSDB") - conf.set(RECOVERY_SERIALIZER, "Kryo") conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir")) var master: Master = null try { master = makeAliveMaster(conf) val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[RocksDBPersistenceEngine] - assert(e.serializer.isInstanceOf[KryoSerializer]) + assert(e.serializer.isInstanceOf[JavaSerializer]) } finally { if (master != null) { master.rpcEnv.shutdown() diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 1eab3158e2e5..774c0bee3129 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -793,19 +793,11 @@ In order to enable this recovery mode, you can set SPARK_DAEMON_JAVA_OPTS in spa <td><code>spark.deploy.recoveryDirectory</code></td> <td>""</td> <td>The directory in which Spark will store recovery state, accessible from the Master's perspective. - Note that the directory should be clearly manualy if <code>spark.deploy.recoveryMode</code>, - <code>spark.deploy.recoverySerializer</code>, or <code>spark.deploy.recoveryCompressionCodec</code> is changed. + Note that the directory should be clearly manualy if <code>spark.deploy.recoveryMode</code> + or <code>spark.deploy.recoveryCompressionCodec</code> is changed. </td> <td>0.8.1</td> </tr> - <tr> - <td><code>spark.deploy.recoverySerializer</code></td> - <td>JAVA</td> - <td>A serializer for writing/reading objects to/from persistence engines; JAVA (default) or KRYO. - Java serializer has been the default mode since Spark 0.8.1. - Kryo serializer is a new fast and compact mode from Spark 4.0.0.</td> - <td>4.0.0</td> - </tr> <tr> <td><code>spark.deploy.recoveryCompressionCodec</code></td> <td>(none)</td> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org