(spark) branch master updated: [SPARK-48970][PYTHON][ML] Avoid using SparkSession.getActiveSession in spark ML reader/writer
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new fba4c8c20e52 [SPARK-48970][PYTHON][ML] Avoid using SparkSession.getActiveSession in spark ML reader/writer fba4c8c20e52 is described below commit fba4c8c20e523c9a441f007442efd616320e7be4 Author: Weichen Xu AuthorDate: Tue Jul 23 19:19:28 2024 +0800 [SPARK-48970][PYTHON][ML] Avoid using SparkSession.getActiveSession in spark ML reader/writer ### What changes were proposed in this pull request? `SparkSession.getActiveSession` is thread-local session, but spark ML reader / writer might be executed in different threads which causes `SparkSession.getActiveSession` returning None. ### Why are the changes needed? It fixes the bug like: ``` spark = SparkSession.getActiveSession() > spark.createDataFrame( # type: ignore[union-attr] [(metadataJson,)], schema=["value"] ).coalesce(1).write.text(metadataPath) E AttributeError: 'NoneType' object has no attribute 'createDataFrame' ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47453 from WeichenXu123/SPARK-48970. Authored-by: Weichen Xu Signed-off-by: Weichen Xu --- .../src/main/scala/org/apache/spark/ml/util/ReadWrite.scala | 2 +- python/pyspark/ml/util.py| 12 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala index 021595f76c24..c127575e1470 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala @@ -588,7 +588,7 @@ private[ml] object DefaultParamsReader { */ def loadMetadata(path: String, sc: SparkContext, expectedClassName: String = ""): Metadata = { val metadataPath = new Path(path, "metadata").toString -val spark = SparkSession.getActiveSession.get +val spark = SparkSession.builder().sparkContext(sc).getOrCreate() val metadataStr = spark.read.text(metadataPath).first().getString(0) parseMetadata(metadataStr, expectedClassName) } diff --git a/python/pyspark/ml/util.py b/python/pyspark/ml/util.py index 5e7965554d82..89e2f9631564 100644 --- a/python/pyspark/ml/util.py +++ b/python/pyspark/ml/util.py @@ -464,10 +464,10 @@ class DefaultParamsWriter(MLWriter): metadataJson = DefaultParamsWriter._get_metadata_to_save( instance, sc, extraMetadata, paramMap ) -spark = SparkSession.getActiveSession() -spark.createDataFrame( # type: ignore[union-attr] -[(metadataJson,)], schema=["value"] -).coalesce(1).write.text(metadataPath) +spark = SparkSession._getActiveSessionOrCreate() +spark.createDataFrame([(metadataJson,)], schema=["value"]).coalesce(1).write.text( +metadataPath +) @staticmethod def _get_metadata_to_save( @@ -580,8 +580,8 @@ class DefaultParamsReader(MLReader[RL]): If non empty, this is checked against the loaded metadata. """ metadataPath = os.path.join(path, "metadata") -spark = SparkSession.getActiveSession() -metadataStr = spark.read.text(metadataPath).first()[0] # type: ignore[union-attr,index] +spark = SparkSession._getActiveSessionOrCreate() +metadataStr = spark.read.text(metadataPath).first()[0] # type: ignore[index] loadedVals = DefaultParamsReader._parseMetaData(metadataStr, expectedClassName) return loadedVals - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-48941][PYTHON][ML] Replace RDD read / write API invocation with Dataframe read / write API
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new be1afd504a44 [SPARK-48941][PYTHON][ML] Replace RDD read / write API invocation with Dataframe read / write API be1afd504a44 is described below commit be1afd504a44ea6058f764e0adf7140eedf704db Author: Weichen Xu AuthorDate: Mon Jul 22 21:18:54 2024 +0800 [SPARK-48941][PYTHON][ML] Replace RDD read / write API invocation with Dataframe read / write API ### What changes were proposed in this pull request? PysparkML: Replace RDD read / write API invocation with Dataframe read / write API ### Why are the changes needed? Follow-up of https://github.com/apache/spark/pull/47341 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47411 from WeichenXu123/SPARK-48909-follow-up. Authored-by: Weichen Xu Signed-off-by: Weichen Xu --- python/pyspark/ml/util.py | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/python/pyspark/ml/util.py b/python/pyspark/ml/util.py index b9a2829a1ca0..5e7965554d82 100644 --- a/python/pyspark/ml/util.py +++ b/python/pyspark/ml/util.py @@ -464,7 +464,10 @@ class DefaultParamsWriter(MLWriter): metadataJson = DefaultParamsWriter._get_metadata_to_save( instance, sc, extraMetadata, paramMap ) -sc.parallelize([metadataJson], 1).saveAsTextFile(metadataPath) +spark = SparkSession.getActiveSession() +spark.createDataFrame( # type: ignore[union-attr] +[(metadataJson,)], schema=["value"] +).coalesce(1).write.text(metadataPath) @staticmethod def _get_metadata_to_save( @@ -577,7 +580,8 @@ class DefaultParamsReader(MLReader[RL]): If non empty, this is checked against the loaded metadata. """ metadataPath = os.path.join(path, "metadata") -metadataStr = sc.textFile(metadataPath, 1).first() +spark = SparkSession.getActiveSession() +metadataStr = spark.read.text(metadataPath).first()[0] # type: ignore[union-attr,index] loadedVals = DefaultParamsReader._parseMetaData(metadataStr, expectedClassName) return loadedVals - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-48463][ML] Make StringIndexer supporting nested input columns
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 9bff2c8bc505 [SPARK-48463][ML] Make StringIndexer supporting nested input columns 9bff2c8bc505 is described below commit 9bff2c8bc5059f5be0dc6e8105c11403942a0b9f Author: Weichen Xu AuthorDate: Mon Jul 15 15:19:59 2024 +0800 [SPARK-48463][ML] Make StringIndexer supporting nested input columns ### What changes were proposed in this pull request? Make StringIndexer supporting nested input columns ### Why are the changes needed? User demand. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? Unit tests. ### Was this patch authored or co-authored using generative AI tooling? Closes #47283 from WeichenXu123/SPARK-48463. Lead-authored-by: Weichen Xu Co-authored-by: WeichenXu Signed-off-by: Weichen Xu --- .../apache/spark/ml/feature/StringIndexer.scala| 37 +++-- .../spark/ml/feature/StringIndexerSuite.scala | 47 +- 2 files changed, 71 insertions(+), 13 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala index 60dc4d024071..34f77f029395 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala @@ -17,6 +17,8 @@ package org.apache.spark.ml.feature +import java.util.ArrayList + import org.apache.hadoop.fs.Path import org.apache.spark.SparkException @@ -27,7 +29,7 @@ import org.apache.spark.ml.attribute.{Attribute, NominalAttribute} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ -import org.apache.spark.sql.{Column, DataFrame, Dataset, Encoder, Encoders, Row} +import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Encoder, Encoders, Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.{If, Literal} import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.functions._ @@ -103,8 +105,8 @@ private[feature] trait StringIndexerBase extends Params with HasHandleInvalid wi private def validateAndTransformField( schema: StructType, inputColName: String, + inputDataType: DataType, outputColName: String): StructField = { -val inputDataType = schema(inputColName).dataType require(inputDataType == StringType || inputDataType.isInstanceOf[NumericType], s"The input column $inputColName must be either string type or numeric type, " + s"but got $inputDataType.") @@ -122,12 +124,22 @@ private[feature] trait StringIndexerBase extends Params with HasHandleInvalid wi require(outputColNames.distinct.length == outputColNames.length, s"Output columns should not be duplicate.") +val sparkSession = SparkSession.getActiveSession.get +val transformDataset = sparkSession.createDataFrame(new ArrayList[Row](), schema = schema) val outputFields = inputColNames.zip(outputColNames).flatMap { case (inputColName, outputColName) => -schema.fieldNames.contains(inputColName) match { - case true => Some(validateAndTransformField(schema, inputColName, outputColName)) - case false if skipNonExistsCol => None - case _ => throw new SparkException(s"Input column $inputColName does not exist.") +try { + val dtype = transformDataset.col(inputColName).expr.dataType + Some( +validateAndTransformField(schema, inputColName, dtype, outputColName) + ) +} catch { + case _: AnalysisException => +if (skipNonExistsCol) { + None +} else { + throw new SparkException(s"Input column $inputColName does not exist.") +} } } StructType(schema.fields ++ outputFields) @@ -431,11 +443,8 @@ class StringIndexerModel ( val labelToIndex = labelsToIndexArray(i) val labels = labelsArray(i) - if (!dataset.schema.fieldNames.contains(inputColName)) { -logWarning(log"Input column ${MDC(LogKeys.COLUMN_NAME, inputColName)} does not exist " + - log"during transformation. Skip StringIndexerModel for this column.") -outputColNames(i) = null - } else { + try { +dataset.col(inputColName) val filteredLabels = getHandleInvalid match { case StringIndexer.KEEP_INVALID => labels :+ "__unknown" case _ => labels @@ -449,9 +458,13 @@ c
(spark) branch master updated: [SPARK-48883][ML][R] Replace RDD read / write API invocation with Dataframe read / write API
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0fa5787d0a6b [SPARK-48883][ML][R] Replace RDD read / write API invocation with Dataframe read / write API 0fa5787d0a6b is described below commit 0fa5787d0a6bd17ccd05ff561bc8dfa88af03312 Author: Weichen Xu AuthorDate: Fri Jul 12 22:20:37 2024 +0800 [SPARK-48883][ML][R] Replace RDD read / write API invocation with Dataframe read / write API ### What changes were proposed in this pull request? Replace RDD read / write API invocation with Dataframe read / write API ### Why are the changes needed? In databricks runtime, RDD read / write API has some issue for certain storage types that requires the account key, but Dataframe read / write API works. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit tests ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47328 from WeichenXu123/ml-df-writer-save-2. Authored-by: Weichen Xu Signed-off-by: Weichen Xu --- .../org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala | 7 +-- mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala | 7 +-- .../main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala | 7 +-- .../org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala | 7 +-- .../org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala | 7 +-- .../main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala| 7 +-- .../src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala | 7 +-- mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala | 4 +++- .../main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala | 7 +-- .../main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala| 7 +-- .../main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala | 7 +-- .../apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala| 7 +-- .../scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala | 7 +-- mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala| 7 +-- mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala | 7 +-- .../scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala | 7 +-- mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala | 7 +-- .../scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala | 7 +-- .../apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala | 4 +++- .../src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala | 7 +-- mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala| 3 ++- .../org/apache/spark/ml/r/RandomForestClassifierWrapper.scala | 8 ++-- .../org/apache/spark/ml/r/RandomForestRegressorWrapper.scala | 8 ++-- mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala | 6 -- 24 files changed, 114 insertions(+), 45 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala index 7eef3ced422e..67057b3fcef6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala @@ -129,7 +129,9 @@ private[r] object AFTSurvivalRegressionWrapper extends MLReadable[AFTSurvivalReg val rMetadata = ("class" -> instance.getClass.getName) ~ ("features" -> instance.features.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + sparkSession.createDataFrame( +Seq(Tuple1(rMetadataJson)) + ).repartition(1).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } @@ -142,7 +144,8 @@ private[r] object AFTSurvivalRegressionWrapper extends MLReadable[AFTSurvivalReg val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) +.first().getString(0) val rMetadata = parse(rMetadataStr) val features = (rMetadata \ "features").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala index 125cdf7259fe..5fc19450d219 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala +++ b/mll
(spark) branch master updated: [SPARK-47663][CORE][TESTS] add end to end test for task limiting according to different cpu and gpu configurations
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new eb9b12692601 [SPARK-47663][CORE][TESTS] add end to end test for task limiting according to different cpu and gpu configurations eb9b12692601 is described below commit eb9b126926016e0156b1d40b1d7ed33d4705d2bb Author: Bobby Wang AuthorDate: Tue Apr 2 15:30:10 2024 +0800 [SPARK-47663][CORE][TESTS] add end to end test for task limiting according to different cpu and gpu configurations ### What changes were proposed in this pull request? Add an end-to-end unit test to ensure that the number of tasks is calculated correctly according to the different task CPU amound and task GPU amount. ### Why are the changes needed? To increase the test coverage. More details can be found at https://github.com/apache/spark/pull/45528#discussion_r1545905575 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The CI can pass. ### Was this patch authored or co-authored using generative AI tooling? No Closes #45794 from wbo4958/end2end-test. Authored-by: Bobby Wang Signed-off-by: Weichen Xu --- .../CoarseGrainedSchedulerBackendSuite.scala | 47 ++ 1 file changed, 47 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 6e94f9abe67b..a75f470deec3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -179,6 +179,53 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo } } + // Every item corresponds to (CPU resources per task, GPU resources per task, + // and the GPU addresses assigned to all tasks). + Seq( +(1, 1, Array(Array("0"), Array("1"), Array("2"), Array("3"))), +(1, 2, Array(Array("0", "1"), Array("2", "3"))), +(1, 4, Array(Array("0", "1", "2", "3"))), +(2, 1, Array(Array("0"), Array("1"))), +(4, 1, Array(Array("0"))), +(4, 2, Array(Array("0", "1"))), +(2, 2, Array(Array("0", "1"), Array("2", "3"))), +(4, 4, Array(Array("0", "1", "2", "3"))), +(1, 3, Array(Array("0", "1", "2"))), +(3, 1, Array(Array("0"))) + ).foreach { case (taskCpus, taskGpus, expectedGpuAddresses) => +test(s"SPARK-47663 end to end test validating if task cpus:${taskCpus} and " + + s"task gpus: ${taskGpus} works") { + withTempDir { dir => +val discoveryScript = createTempScriptWithExpectedOutput( + dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0", "1", "2", "3"]}""") +val conf = new SparkConf() + .set(CPUS_PER_TASK, taskCpus) + .setMaster("local-cluster[1, 4, 1024]") + .setAppName("test") + .set(WORKER_GPU_ID.amountConf, "4") + .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript) + .set(EXECUTOR_GPU_ID.amountConf, "4") + .set(TASK_GPU_ID.amountConf, taskGpus.toString) + +sc = new SparkContext(conf) +eventually(timeout(executorUpTimeout)) { + // Ensure all executors have been launched. + assert(sc.getExecutorIds().length == 1) +} + +val numPartitions = Seq(4 / taskCpus, 4 / taskGpus).min +val ret = sc.parallelize(1 to 20, numPartitions).mapPartitions { _ => + val tc = TaskContext.get() + assert(tc.cpus() == taskCpus) + val gpus = tc.resources()("gpu").addresses + Iterator.single(gpus) +}.collect() + +assert(ret === expectedGpuAddresses) + } +} + } + // Here we just have test for one happy case instead of all cases: other cases are covered in // FsHistoryProviderSuite. test("custom log url for Spark UI is applied") { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (081809667611 -> c4e4497ff7e7)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 081809667611 [MINOR][SQL] Remove `unsupportedOperationMsg` from `CaseInsensitiveStringMap` add c4e4497ff7e7 [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile No new revisions were added by this update. Summary of changes: .../sql/connect/planner/SparkConnectPlanner.scala | 6 +- dev/sparktestsupport/modules.py| 1 + python/pyspark/sql/pandas/map_ops.py | 61 +++- python/pyspark/sql/tests/test_resources.py | 104 + .../catalyst/analysis/DeduplicateRelations.scala | 4 +- .../plans/logical/pythonLogicalOperators.scala | 7 +- .../sql/catalyst/analysis/AnalysisSuite.scala | 9 +- .../main/scala/org/apache/spark/sql/Dataset.scala | 17 +++- .../spark/sql/execution/SparkStrategies.scala | 8 +- .../sql/execution/python/MapInArrowExec.scala | 4 +- .../sql/execution/python/MapInBatchExec.scala | 6 +- .../sql/execution/python/MapInPandasExec.scala | 4 +- 12 files changed, 206 insertions(+), 25 deletions(-) create mode 100644 python/pyspark/sql/tests/test_resources.py - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (8394ebb52b9 -> e1a7b84f47b)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 8394ebb52b9 [SPARK-45469][CORE][SQL][CONNECT][PYTHON] Replace `toIterator` with `iterator` for `IterableOnce` add e1a7b84f47b [SPARK-45397][ML][CONNECT] Add array assembler feature transformer No new revisions were added by this update. Summary of changes: .../docs/source/reference/pyspark.ml.connect.rst | 1 + python/pyspark/ml/connect/base.py | 6 +- python/pyspark/ml/connect/feature.py | 156 - python/pyspark/ml/connect/util.py | 6 +- python/pyspark/ml/param/_shared_params_code_gen.py | 7 + python/pyspark/ml/param/shared.py | 22 +++ .../ml/tests/connect/test_legacy_mode_feature.py | 48 +++ 7 files changed, 238 insertions(+), 8 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45386][SQL]: Fix correctness issue with persist using StorageLevel.NONE on Dataset (#43188)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a0c9ab63f3b [SPARK-45386][SQL]: Fix correctness issue with persist using StorageLevel.NONE on Dataset (#43188) a0c9ab63f3b is described below commit a0c9ab63f3bcf4c9bb56c407375ce1c8cc26fb02 Author: Emil Ejbyfeldt AuthorDate: Mon Oct 2 11:36:53 2023 +0200 [SPARK-45386][SQL]: Fix correctness issue with persist using StorageLevel.NONE on Dataset (#43188) * SPARK-45386: Fix correctness issue with StorageLevel.NONE * Move to CacheManager * Add comment --- .../main/scala/org/apache/spark/sql/execution/CacheManager.scala| 4 +++- sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 6 ++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 064819275e0..e906c74f8a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -113,7 +113,9 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { planToCache: LogicalPlan, tableName: Option[String], storageLevel: StorageLevel): Unit = { -if (lookupCachedData(planToCache).nonEmpty) { +if (storageLevel == StorageLevel.NONE) { + // Do nothing for StorageLevel.NONE since it will not actually cache any data. +} else if (lookupCachedData(planToCache).nonEmpty) { logWarning("Asked to cache already cached data.") } else { val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 04e619fa908..8fb25e120f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -47,6 +47,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ +import org.apache.spark.storage.StorageLevel case class TestDataPoint(x: Int, y: Double, s: String, t: TestDataPoint2) case class TestDataPoint2(x: Int, s: String) @@ -2604,6 +2605,11 @@ class DatasetSuite extends QueryTest parameters = Map("cls" -> classOf[Array[Int]].getName)) } } + + test("SPARK-45386: persist with StorageLevel.NONE should give correct count") { +val ds = Seq(1, 2).toDS().persist(StorageLevel.NONE) +assert(ds.count() == 2) + } } class DatasetLargeResultCollectingSuite extends QueryTest - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-44908][ML][CONNECT] Fix cross validator foldCol param functionality
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 40ccabfd681 [SPARK-44908][ML][CONNECT] Fix cross validator foldCol param functionality 40ccabfd681 is described below commit 40ccabfd68141eabe8f9b9bf15acad9fc6b7dff1 Author: Weichen Xu AuthorDate: Wed Aug 23 18:19:15 2023 +0800 [SPARK-44908][ML][CONNECT] Fix cross validator foldCol param functionality ### What changes were proposed in this pull request? Fix cross validator foldCol param functionality. In main branch the code calls `df.rdd` APIs but it is not supported in spark connect ### Why are the changes needed? Bug fix. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42605 from WeichenXu123/fix-tuning-connect-foldCol. Authored-by: Weichen Xu Signed-off-by: Weichen Xu (cherry picked from commit 0d1b5975b2d308c616312d53b9f7ad754348a266) Signed-off-by: Weichen Xu --- python/pyspark/ml/connect/tuning.py| 24 ++--- .../ml/tests/connect/test_legacy_mode_tuning.py| 25 ++ 2 files changed, 32 insertions(+), 17 deletions(-) diff --git a/python/pyspark/ml/connect/tuning.py b/python/pyspark/ml/connect/tuning.py index c22c31e84e8..871e448966c 100644 --- a/python/pyspark/ml/connect/tuning.py +++ b/python/pyspark/ml/connect/tuning.py @@ -42,8 +42,7 @@ from pyspark.ml.connect.io_utils import ( ) from pyspark.ml.param import Params, Param, TypeConverters from pyspark.ml.param.shared import HasParallelism, HasSeed -from pyspark.sql.functions import col, lit, rand, UserDefinedFunction -from pyspark.sql.types import BooleanType +from pyspark.sql.functions import col, lit, rand from pyspark.sql.dataframe import DataFrame from pyspark.sql import SparkSession @@ -477,23 +476,14 @@ class CrossValidator( train = df.filter(~condition) datasets.append((train, validation)) else: -# Use user-specified fold numbers. -def checker(foldNum: int) -> bool: -if foldNum < 0 or foldNum >= nFolds: -raise ValueError( -"Fold number must be in range [0, %s), but got %s." % (nFolds, foldNum) -) -return True - -checker_udf = UserDefinedFunction(checker, BooleanType()) +# TODO: +# Add verification that foldCol column values are in range [0, nFolds) for i in range(nFolds): -training = dataset.filter(checker_udf(dataset[foldCol]) & (col(foldCol) != lit(i))) -validation = dataset.filter( -checker_udf(dataset[foldCol]) & (col(foldCol) == lit(i)) -) -if training.rdd.getNumPartitions() == 0 or len(training.take(1)) == 0: +training = dataset.filter(col(foldCol) != lit(i)) +validation = dataset.filter(col(foldCol) == lit(i)) +if training.isEmpty(): raise ValueError("The training data at fold %s is empty." % i) -if validation.rdd.getNumPartitions() == 0 or len(validation.take(1)) == 0: +if validation.isEmpty(): raise ValueError("The validation data at fold %s is empty." % i) datasets.append((training, validation)) diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py b/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py index d6c813533d6..0ade227540c 100644 --- a/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py +++ b/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py @@ -246,6 +246,31 @@ class CrossValidatorTestsMixin: np.testing.assert_allclose(cv_model.avgMetrics, loaded_cv_model.avgMetrics) np.testing.assert_allclose(cv_model.stdMetrics, loaded_cv_model.stdMetrics) +def test_crossvalidator_with_fold_col(self): +sk_dataset = load_breast_cancer() + +train_dataset = self.spark.createDataFrame( +zip( +sk_dataset.data.tolist(), +[int(t) for t in sk_dataset.target], +[int(i % 3) for i in range(len(sk_dataset.target))], +), +schema="features: array, label: long, fold: long", +) + +lorv2 = LORV2(numTrainWorkers=2) + +grid2 = ParamGridBuilder().addGrid(lorv2.maxIter, [2, 200]).build() +cv = CrossValidator( +estimator=lorv2, +estimatorParamMaps=
[spark] branch master updated: [SPARK-44908][ML][CONNECT] Fix cross validator foldCol param functionality
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0d1b5975b2d [SPARK-44908][ML][CONNECT] Fix cross validator foldCol param functionality 0d1b5975b2d is described below commit 0d1b5975b2d308c616312d53b9f7ad754348a266 Author: Weichen Xu AuthorDate: Wed Aug 23 18:19:15 2023 +0800 [SPARK-44908][ML][CONNECT] Fix cross validator foldCol param functionality ### What changes were proposed in this pull request? Fix cross validator foldCol param functionality. In main branch the code calls `df.rdd` APIs but it is not supported in spark connect ### Why are the changes needed? Bug fix. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42605 from WeichenXu123/fix-tuning-connect-foldCol. Authored-by: Weichen Xu Signed-off-by: Weichen Xu --- python/pyspark/ml/connect/tuning.py| 24 ++--- .../ml/tests/connect/test_legacy_mode_tuning.py| 25 ++ 2 files changed, 32 insertions(+), 17 deletions(-) diff --git a/python/pyspark/ml/connect/tuning.py b/python/pyspark/ml/connect/tuning.py index c22c31e84e8..871e448966c 100644 --- a/python/pyspark/ml/connect/tuning.py +++ b/python/pyspark/ml/connect/tuning.py @@ -42,8 +42,7 @@ from pyspark.ml.connect.io_utils import ( ) from pyspark.ml.param import Params, Param, TypeConverters from pyspark.ml.param.shared import HasParallelism, HasSeed -from pyspark.sql.functions import col, lit, rand, UserDefinedFunction -from pyspark.sql.types import BooleanType +from pyspark.sql.functions import col, lit, rand from pyspark.sql.dataframe import DataFrame from pyspark.sql import SparkSession @@ -477,23 +476,14 @@ class CrossValidator( train = df.filter(~condition) datasets.append((train, validation)) else: -# Use user-specified fold numbers. -def checker(foldNum: int) -> bool: -if foldNum < 0 or foldNum >= nFolds: -raise ValueError( -"Fold number must be in range [0, %s), but got %s." % (nFolds, foldNum) -) -return True - -checker_udf = UserDefinedFunction(checker, BooleanType()) +# TODO: +# Add verification that foldCol column values are in range [0, nFolds) for i in range(nFolds): -training = dataset.filter(checker_udf(dataset[foldCol]) & (col(foldCol) != lit(i))) -validation = dataset.filter( -checker_udf(dataset[foldCol]) & (col(foldCol) == lit(i)) -) -if training.rdd.getNumPartitions() == 0 or len(training.take(1)) == 0: +training = dataset.filter(col(foldCol) != lit(i)) +validation = dataset.filter(col(foldCol) == lit(i)) +if training.isEmpty(): raise ValueError("The training data at fold %s is empty." % i) -if validation.rdd.getNumPartitions() == 0 or len(validation.take(1)) == 0: +if validation.isEmpty(): raise ValueError("The validation data at fold %s is empty." % i) datasets.append((training, validation)) diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py b/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py index d6c813533d6..0ade227540c 100644 --- a/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py +++ b/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py @@ -246,6 +246,31 @@ class CrossValidatorTestsMixin: np.testing.assert_allclose(cv_model.avgMetrics, loaded_cv_model.avgMetrics) np.testing.assert_allclose(cv_model.stdMetrics, loaded_cv_model.stdMetrics) +def test_crossvalidator_with_fold_col(self): +sk_dataset = load_breast_cancer() + +train_dataset = self.spark.createDataFrame( +zip( +sk_dataset.data.tolist(), +[int(t) for t in sk_dataset.target], +[int(i % 3) for i in range(len(sk_dataset.target))], +), +schema="features: array, label: long, fold: long", +) + +lorv2 = LORV2(numTrainWorkers=2) + +grid2 = ParamGridBuilder().addGrid(lorv2.maxIter, [2, 200]).build() +cv = CrossValidator( +estimator=lorv2, +estimatorParamMaps=grid2, +parallelism=2, +evaluator=BinaryClassificationEvaluator(), +foldCol=&q
[spark] branch branch-3.5 updated: [SPARK-44909][ML] Skip starting torch distributor log streaming server when it is not available
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 4f61662e91a [SPARK-44909][ML] Skip starting torch distributor log streaming server when it is not available 4f61662e91a is described below commit 4f61662e91a77315a9d3d7454884f47c8bb9a6b1 Author: Weichen Xu AuthorDate: Wed Aug 23 15:30:50 2023 +0800 [SPARK-44909][ML] Skip starting torch distributor log streaming server when it is not available ### What changes were proposed in this pull request? Skip starting torch distributor log streaming server when it is not available. In some cases, e.g., in a databricks connect cluster, there is some network limitation that casues starting log streaming server failure, but, this does not need to break torch distributor training routine. In this PR, it captures exception raised from log server `start` method, and set server port to be -1 if `start` failed. ### Why are the changes needed? In some cases, e.g., in a databricks connect cluster, there is some network limitation that casues starting log streaming server failure, but, this does not need to break torch distributor training routine. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42606 from WeichenXu123/fix-torch-log-server-in-connect-mode. Authored-by: Weichen Xu Signed-off-by: Weichen Xu (cherry picked from commit 80668dc1a36ac0def80f3c18f981fbdacfb2904d) Signed-off-by: Weichen Xu --- python/pyspark/ml/torch/distributor.py | 16 +--- python/pyspark/ml/torch/log_communication.py | 3 +++ 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/python/pyspark/ml/torch/distributor.py b/python/pyspark/ml/torch/distributor.py index b407672ac48..d0979f53d41 100644 --- a/python/pyspark/ml/torch/distributor.py +++ b/python/pyspark/ml/torch/distributor.py @@ -765,9 +765,19 @@ class TorchDistributor(Distributor): log_streaming_server = LogStreamingServer() self.driver_address = _get_conf(self.spark, "spark.driver.host", "") assert self.driver_address != "" -log_streaming_server.start(spark_host_address=self.driver_address) -time.sleep(1) # wait for the server to start -self.log_streaming_server_port = log_streaming_server.port +try: +log_streaming_server.start(spark_host_address=self.driver_address) +time.sleep(1) # wait for the server to start +self.log_streaming_server_port = log_streaming_server.port +except Exception as e: +# If starting log streaming server failed, we don't need to break +# the distributor training but emit a warning instead. +self.log_streaming_server_port = -1 +self.logger.warning( +"Start torch distributor log streaming server failed, " +"You cannot receive logs sent from distributor workers, ", +f"error: {repr(e)}.", +) try: spark_task_function = self._get_spark_task_function( diff --git a/python/pyspark/ml/torch/log_communication.py b/python/pyspark/ml/torch/log_communication.py index ca91121d3e3..8efa83e62c3 100644 --- a/python/pyspark/ml/torch/log_communication.py +++ b/python/pyspark/ml/torch/log_communication.py @@ -156,6 +156,9 @@ class LogStreamingClient(LogStreamingClientBase): warnings.warn(f"{error_msg}: {traceback.format_exc()}\n") def _connect(self) -> None: +if self.port == -1: +self._fail("Log streaming server is not available.") +return try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(self.timeout) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (00cd5e846b6 -> 80668dc1a36)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 00cd5e846b6 [SPARK-44899][PYTHON][DOCS] Refine the docstring of DataFrame.collect add 80668dc1a36 [SPARK-44909][ML] Skip starting torch distributor log streaming server when it is not available No new revisions were added by this update. Summary of changes: python/pyspark/ml/torch/distributor.py | 16 +--- python/pyspark/ml/torch/log_communication.py | 3 +++ 2 files changed, 16 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44264][ML][PYTHON] Incorporating FunctionPickler Into TorchDistributor
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2ab70576d68 [SPARK-44264][ML][PYTHON] Incorporating FunctionPickler Into TorchDistributor 2ab70576d68 is described below commit 2ab70576d68d07aa69dc1e5a9264e0c9e9f05df0 Author: Mathew Jacob AuthorDate: Wed Jul 19 08:29:33 2023 +0800 [SPARK-44264][ML][PYTHON] Incorporating FunctionPickler Into TorchDistributor ### What changes were proposed in this pull request? For the pickling process when running distributed training on functions, I migrated the TorchDistributor to leverage the FunctionPickler class instead of internal functions. This separates the responsibility better and uses code already written and tested. FunctionPickler PR is [here](https://github.com/apache/spark/pull/41946). ### Why are the changes needed? Separates the responsibility, making TorchDistributor class deal with more focused responsibility. ### Does this PR introduce _any_ user-facing change? No, this is all internal. ### How was this patch tested? Existing TorchDistributor tests. Closes #42045 from mathewjacob1002/integrate_fn_pickler. Authored-by: Mathew Jacob Signed-off-by: Weichen Xu --- python/pyspark/ml/torch/distributor.py | 49 +- 1 file changed, 7 insertions(+), 42 deletions(-) diff --git a/python/pyspark/ml/torch/distributor.py b/python/pyspark/ml/torch/distributor.py index 5b5d7c2288a..81c9d03abcf 100644 --- a/python/pyspark/ml/torch/distributor.py +++ b/python/pyspark/ml/torch/distributor.py @@ -48,6 +48,7 @@ from pyspark.ml.torch.log_communication import ( # type: ignore LogStreamingClient, LogStreamingServer, ) +from pyspark.ml.dl_util import FunctionPickler from pyspark.ml.util import _get_active_session @@ -746,12 +747,13 @@ class TorchDistributor(Distributor): train_fn: Callable, *args: Any, **kwargs: Any ) -> Generator[Tuple[str, str], None, None]: save_dir = TorchDistributor._create_save_dir() -pickle_file_path = TorchDistributor._save_pickled_function( -save_dir, train_fn, *args, **kwargs +pickle_file_path = FunctionPickler.pickle_fn_and_save( +train_fn, "", save_dir, *args, **kwargs ) output_file_path = os.path.join(save_dir, TorchDistributor._PICKLED_OUTPUT_FILE) -train_file_path = TorchDistributor._create_torchrun_train_file( -save_dir, pickle_file_path, output_file_path +script_path = os.path.join(save_dir, TorchDistributor._TRAIN_FILE) +train_file_path = FunctionPickler.create_fn_run_script( +pickle_file_path, output_file_path, script_path ) try: yield (train_file_path, output_file_path) @@ -817,7 +819,7 @@ class TorchDistributor(Distributor): "View stdout logs for detailed error message." ) try: -output = TorchDistributor._get_pickled_output(output_file_path) +output = FunctionPickler.get_fn_output(output_file_path) except Exception as e: raise RuntimeError( "TorchDistributor failed due to a pickling error. " @@ -834,43 +836,6 @@ class TorchDistributor(Distributor): def _cleanup_files(save_dir: str) -> None: shutil.rmtree(save_dir, ignore_errors=True) -@staticmethod -def _save_pickled_function( -save_dir: str, train_fn: Union[str, Callable], *args: Any, **kwargs: Any -) -> str: -saved_pickle_path = os.path.join(save_dir, TorchDistributor._PICKLED_FUNC_FILE) -with open(saved_pickle_path, "wb") as f: -cloudpickle.dump((train_fn, args, kwargs), f) -return saved_pickle_path - -@staticmethod -def _create_torchrun_train_file( -save_dir_path: str, pickle_file_path: str, output_file_path: str -) -> str: -code = textwrap.dedent( -f""" -from pyspark import cloudpickle -import os - -if __name__ == "__main__": -with open("{pickle_file_path}", "rb") as f: -train_fn, args, kwargs = cloudpickle.load(f) -output = train_fn(*args, **kwargs) -with open("{output_file_path}", "wb") as f: -cloudpickle.dump(output, f) -""" -) -saved_file_path = os.path.join(save_dir_path, TorchDistributor._TRAIN_FILE) -with open(saved_file_path, "w") as f: -f.w
[spark] branch master updated (0d90f2a8ea0 -> 054f94af95f)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 0d90f2a8ea0 [SPARK-44264][ML][PYTHON] Write a Deepspeed Distributed Learning Class DeepspeedTorchDistributor add 054f94af95f [SPARK-44374][PYTHON][ML] Add example code for distributed ML for spark connect No new revisions were added by this update. Summary of changes: python/pyspark/ml/connect/classification.py | 25 + python/pyspark/ml/connect/evaluation.py | 38 python/pyspark/ml/connect/feature.py| 56 + python/pyspark/ml/connect/pipeline.py | 28 +++ python/pyspark/ml/connect/tuning.py | 24 - 5 files changed, 170 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-43983][PYTHON][ML][CONNECT] Implement cross validator estimator
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 01918bb9017 [SPARK-43983][PYTHON][ML][CONNECT] Implement cross validator estimator 01918bb9017 is described below commit 01918bb90170c13abd6c0f0f5c47f5d9bcc02adc Author: Weichen Xu AuthorDate: Tue Jul 11 07:00:19 2023 +0800 [SPARK-43983][PYTHON][ML][CONNECT] Implement cross validator estimator ### What changes were proposed in this pull request? Implement cross validator estimator for spark connect. ### Why are the changes needed? Distributed ML on spark connect project. ### Does this PR introduce _any_ user-facing change? Yes. New class `pyspark.ml.connect.tuning.CrossValidator` and `pyspark.ml.connect.tuning.CrossValidatorModel` are added. ### How was this patch tested? Unit tests. Closes #41881 from WeichenXu123/SPARK-43983-cross-val. Authored-by: Weichen Xu Signed-off-by: Weichen Xu --- dev/sparktestsupport/modules.py| 2 + python/pyspark/ml/connect/__init__.py | 2 + python/pyspark/ml/connect/base.py | 18 +- python/pyspark/ml/connect/evaluation.py| 83 ++- python/pyspark/ml/connect/io_utils.py | 76 ++- python/pyspark/ml/connect/pipeline.py | 47 +- python/pyspark/ml/connect/tuning.py| 566 + .../ml/tests/connect/test_connect_tuning.py| 45 ++ .../tests/connect/test_legacy_mode_evaluation.py | 31 ++ .../ml/tests/connect/test_legacy_mode_tuning.py| 267 ++ 10 files changed, 1080 insertions(+), 57 deletions(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 439ae40a0f8..2090546512f 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -620,6 +620,7 @@ pyspark_ml = Module( "pyspark.ml.tests.connect.test_legacy_mode_feature", "pyspark.ml.tests.connect.test_legacy_mode_classification", "pyspark.ml.tests.connect.test_legacy_mode_pipeline", +"pyspark.ml.tests.connect.test_legacy_mode_tuning", ], excluded_python_implementations=[ "PyPy" # Skip these tests under PyPy since they require numpy and it isn't available there @@ -866,6 +867,7 @@ pyspark_connect = Module( "pyspark.ml.tests.connect.test_connect_feature", "pyspark.ml.tests.connect.test_connect_classification", "pyspark.ml.tests.connect.test_connect_pipeline", +"pyspark.ml.tests.connect.test_connect_tuning", ], excluded_python_implementations=[ "PyPy" # Skip these tests under PyPy since they require numpy, pandas, and pyarrow and diff --git a/python/pyspark/ml/connect/__init__.py b/python/pyspark/ml/connect/__init__.py index 2e048355d74..2ee152f6a38 100644 --- a/python/pyspark/ml/connect/__init__.py +++ b/python/pyspark/ml/connect/__init__.py @@ -26,6 +26,7 @@ from pyspark.ml.connect.base import ( from pyspark.ml.connect import ( feature, evaluation, +tuning, ) from pyspark.ml.connect.pipeline import Pipeline, PipelineModel @@ -39,4 +40,5 @@ __all__ = [ "evaluation", "Pipeline", "PipelineModel", +"tuning", ] diff --git a/python/pyspark/ml/connect/base.py b/python/pyspark/ml/connect/base.py index f86b1e928c2..f8ce0cb6962 100644 --- a/python/pyspark/ml/connect/base.py +++ b/python/pyspark/ml/connect/base.py @@ -146,7 +146,9 @@ class Transformer(Params, metaclass=ABCMeta): """ raise NotImplementedError() -def transform(self, dataset: Union[DataFrame, pd.DataFrame]) -> Union[DataFrame, pd.DataFrame]: +def transform( +self, dataset: Union[DataFrame, pd.DataFrame], params: Optional["ParamMap"] = None +) -> Union[DataFrame, pd.DataFrame]: """ Transforms the input dataset. The dataset can be either pandas dataframe or spark dataframe, @@ -163,12 +165,24 @@ class Transformer(Params, metaclass=ABCMeta): dataset : :py:class:`pyspark.sql.DataFrame` or py:class:`pandas.DataFrame` input dataset. +params : dict, optional +an optional param map that overrides embedded params. + Returns --- :py:class:`pyspark.sql.DataFrame` or py:class:`pandas.DataFrame` transformed dataset, the type of output dataframe is consistent with input dataframe. """ +if params is None: +params = dict() +if isinstance(params, dict): +if par
[spark] branch master updated (7bc28d54f83 -> 7fcabef2874)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 7bc28d54f83 [SPARK-44269][SQL] Assign names to the error class _LEGACY_ERROR_TEMP_[2310-2314] add 7fcabef2874 [SPARK-44250][ML][PYTHON][CONNECT] Implement classification evaluator No new revisions were added by this update. Summary of changes: python/pyspark/ml/connect/evaluation.py| 161 - .../tests/connect/test_legacy_mode_evaluation.py | 77 +- 2 files changed, 202 insertions(+), 36 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (0865c0db923 -> 35b3a18ff04)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 0865c0db923 [SPARK-43944][SPARK-43942][SQL][FOLLOWUP] Directly leverage `UnresolvedFunction` for functions `startswith`/`endswith`/`contains` add 35b3a18ff04 [SPARK-44100][ML][CONNECT][PYTHON] Move namespace from `pyspark.mlv2` to `pyspark.ml.connect` No new revisions were added by this update. Summary of changes: dev/sparktestsupport/modules.py| 18 +- python/mypy.ini| 3 -- python/pyspark/ml/connect/__init__.py | 24 + python/pyspark/{mlv2 => ml/connect}/base.py| 2 +- .../pyspark/{mlv2 => ml/connect}/classification.py | 6 ++-- python/pyspark/{mlv2 => ml/connect}/evaluation.py | 6 ++-- python/pyspark/{mlv2 => ml/connect}/feature.py | 6 ++-- python/pyspark/{mlv2 => ml/connect}/io_utils.py| 0 python/pyspark/{mlv2 => ml/connect}/pipeline.py| 8 ++--- python/pyspark/{mlv2 => ml/connect}/summarizer.py | 2 +- python/pyspark/{mlv2 => ml/connect}/util.py| 0 .../tests/connect/test_connect_classification.py} | 6 ++-- .../tests/connect/test_connect_evaluation.py} | 4 +-- .../tests/connect/test_connect_feature.py} | 4 +-- .../tests/connect/test_connect_pipeline.py}| 6 ++-- .../tests/connect/test_connect_summarizer.py} | 4 +-- .../connect/test_legacy_mode_classification.py}| 6 ++-- .../tests/connect/test_legacy_mode_evaluation.py} | 4 +-- .../tests/connect/test_legacy_mode_feature.py} | 4 +-- .../tests/connect/test_legacy_mode_pipeline.py}| 10 +++--- .../tests/connect/test_legacy_mode_summarizer.py} | 4 +-- python/pyspark/mlv2/__init__.py| 40 -- 22 files changed, 75 insertions(+), 92 deletions(-) rename python/pyspark/{mlv2 => ml/connect}/base.py (99%) rename python/pyspark/{mlv2 => ml/connect}/classification.py (98%) rename python/pyspark/{mlv2 => ml/connect}/evaluation.py (95%) rename python/pyspark/{mlv2 => ml/connect}/feature.py (97%) rename python/pyspark/{mlv2 => ml/connect}/io_utils.py (100%) rename python/pyspark/{mlv2 => ml/connect}/pipeline.py (96%) rename python/pyspark/{mlv2 => ml/connect}/summarizer.py (98%) rename python/pyspark/{mlv2 => ml/connect}/util.py (100%) copy python/pyspark/{mlv2/tests/connect/test_parity_classification.py => ml/tests/connect/test_connect_classification.py} (84%) rename python/pyspark/{mlv2/tests/connect/test_parity_evaluation.py => ml/tests/connect/test_connect_evaluation.py} (89%) rename python/pyspark/{mlv2/tests/connect/test_parity_feature.py => ml/tests/connect/test_connect_feature.py} (89%) rename python/pyspark/{mlv2/tests/connect/test_parity_classification.py => ml/tests/connect/test_connect_pipeline.py} (85%) rename python/pyspark/{mlv2/tests/connect/test_parity_summarizer.py => ml/tests/connect/test_connect_summarizer.py} (88%) rename python/pyspark/{mlv2/tests/test_classification.py => ml/tests/connect/test_legacy_mode_classification.py} (98%) rename python/pyspark/{mlv2/tests/test_evaluation.py => ml/tests/connect/test_legacy_mode_evaluation.py} (94%) rename python/pyspark/{mlv2/tests/test_feature.py => ml/tests/connect/test_legacy_mode_feature.py} (97%) rename python/pyspark/{mlv2/tests/test_pipeline.py => ml/tests/connect/test_legacy_mode_pipeline.py} (95%) rename python/pyspark/{mlv2/tests/test_summarizer.py => ml/tests/connect/test_legacy_mode_summarizer.py} (94%) delete mode 100644 python/pyspark/mlv2/__init__.py - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-43982][ML][PYTHON][CONNECT] Implement pipeline estimator for ML on spark connect
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6c0c226d901 [SPARK-43982][ML][PYTHON][CONNECT] Implement pipeline estimator for ML on spark connect 6c0c226d901 is described below commit 6c0c226d90192e54a4965b6d69905936619e20d6 Author: Weichen Xu AuthorDate: Mon Jun 19 21:36:21 2023 +0800 [SPARK-43982][ML][PYTHON][CONNECT] Implement pipeline estimator for ML on spark connect ### What changes were proposed in this pull request? Implement pipeline estimator for ML on spark connect ### Why are the changes needed? See Distributed ML <> spark connect project design doc: https://docs.google.com/document/d/1LHzwCjm2SluHkta_08cM3jxFSgfF-niaCZbtIThG-H8/edit#heading=h.x8uc4xogrzbk ### Does this PR introduce _any_ user-facing change? Yes. New estimator `pyspark.mlv2.pipeline.Pipeline` is added. ### How was this patch tested? Unit tests. Closes #41479 from WeichenXu123/mlv2-pipeline. Authored-by: Weichen Xu Signed-off-by: Weichen Xu --- python/pyspark/mlv2/__init__.py| 4 + python/pyspark/mlv2/classification.py | 6 +- python/pyspark/mlv2/feature.py | 6 +- python/pyspark/mlv2/io_utils.py| 187 ++ python/pyspark/mlv2/pipeline.py| 241 + python/pyspark/mlv2/tests/test_pipeline.py | 184 ++ 6 files changed, 561 insertions(+), 67 deletions(-) diff --git a/python/pyspark/mlv2/__init__.py b/python/pyspark/mlv2/__init__.py index 990b4fa9da8..352d24baabe 100644 --- a/python/pyspark/mlv2/__init__.py +++ b/python/pyspark/mlv2/__init__.py @@ -26,6 +26,8 @@ from pyspark.mlv2 import ( evaluation, ) +from pyspark.mlv2.pipeline import Pipeline, PipelineModel + __all__ = [ "Estimator", "Transformer", @@ -33,4 +35,6 @@ __all__ = [ "Model", "feature", "evaluation", +"Pipeline", +"PipelineModel", ] diff --git a/python/pyspark/mlv2/classification.py b/python/pyspark/mlv2/classification.py index fe0d76837f9..522c54b5289 100644 --- a/python/pyspark/mlv2/classification.py +++ b/python/pyspark/mlv2/classification.py @@ -40,7 +40,7 @@ from pyspark.ml.param.shared import ( HasMomentum, ) from pyspark.mlv2.base import Predictor, PredictionModel -from pyspark.mlv2.io_utils import ParamsReadWrite, ModelReadWrite +from pyspark.mlv2.io_utils import ParamsReadWrite, CoreModelReadWrite from pyspark.sql.functions import lit, count, countDistinct import torch @@ -253,7 +253,9 @@ class LogisticRegression( @inherit_doc -class LogisticRegressionModel(PredictionModel, _LogisticRegressionParams, ModelReadWrite): +class LogisticRegressionModel( +PredictionModel, _LogisticRegressionParams, ParamsReadWrite, CoreModelReadWrite +): """ Model fitted by LogisticRegression. diff --git a/python/pyspark/mlv2/feature.py b/python/pyspark/mlv2/feature.py index 57c6213d2bb..a58f214711c 100644 --- a/python/pyspark/mlv2/feature.py +++ b/python/pyspark/mlv2/feature.py @@ -24,7 +24,7 @@ from pyspark import keyword_only from pyspark.sql import DataFrame from pyspark.ml.param.shared import HasInputCol, HasOutputCol from pyspark.mlv2.base import Estimator, Model -from pyspark.mlv2.io_utils import ParamsReadWrite, ModelReadWrite +from pyspark.mlv2.io_utils import ParamsReadWrite, CoreModelReadWrite from pyspark.mlv2.summarizer import summarize_dataframe @@ -61,7 +61,7 @@ class MaxAbsScaler(Estimator, HasInputCol, HasOutputCol, ParamsReadWrite): return self._copyValues(model) -class MaxAbsScalerModel(Model, HasInputCol, HasOutputCol, ModelReadWrite): +class MaxAbsScalerModel(Model, HasInputCol, HasOutputCol, ParamsReadWrite, CoreModelReadWrite): def __init__( self, max_abs_values: Optional["np.ndarray"] = None, n_samples_seen: Optional[int] = None ) -> None: @@ -143,7 +143,7 @@ class StandardScaler(Estimator, HasInputCol, HasOutputCol, ParamsReadWrite): return self._copyValues(model) -class StandardScalerModel(Model, HasInputCol, HasOutputCol, ModelReadWrite): +class StandardScalerModel(Model, HasInputCol, HasOutputCol, ParamsReadWrite, CoreModelReadWrite): def __init__( self, mean_values: Optional["np.ndarray"] = None, diff --git a/python/pyspark/mlv2/io_utils.py b/python/pyspark/mlv2/io_utils.py index 8f7263206a7..c701736712f 100644 --- a/python/pyspark/mlv2/io_utils.py +++ b/python/pyspark/mlv2/io_utils.py @@ -21,7 +21,8 @@ import os import tempfile import time from urllib.parse import urlparse -from typing import Any, Dict, Optional +from typing import Any, Dict,
[spark] branch master updated: [SPARK-43097][FOLLOW-UP][ML] Improve logistic regression model saving
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4e0bd3c5717 [SPARK-43097][FOLLOW-UP][ML] Improve logistic regression model saving 4e0bd3c5717 is described below commit 4e0bd3c571758f441d56d0341d6c4f506fdb1565 Author: Weichen Xu AuthorDate: Sat Jun 17 22:30:39 2023 +0800 [SPARK-43097][FOLLOW-UP][ML] Improve logistic regression model saving ### What changes were proposed in this pull request? Improve logistic regression model saving: Current master code, it saves the core pytorch model that only includes the "Linear" layer, to make the saved pytorch model easier to use solely without pyspark, I append a "softmax" layer to the torch model and then save it. ### Why are the changes needed? Improving the saved pytorch model in `LogisticRegressionModel.save` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Closes #41629 from WeichenXu123/improve-lor-model-save. Authored-by: Weichen Xu Signed-off-by: Weichen Xu --- python/pyspark/mlv2/classification.py| 9 +++-- python/pyspark/mlv2/tests/test_classification.py | 19 +++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/python/pyspark/mlv2/classification.py b/python/pyspark/mlv2/classification.py index 0fcded0d769..fe0d76837f9 100644 --- a/python/pyspark/mlv2/classification.py +++ b/python/pyspark/mlv2/classification.py @@ -331,10 +331,15 @@ class LogisticRegressionModel(PredictionModel, _LogisticRegressionParams, ModelR return self.__class__.__name__ + ".torch" def _save_core_model(self, path: str) -> None: -torch.save(self.torch_model, path) +lor_torch_model = torch_nn.Sequential( +self.torch_model, +torch_nn.Softmax(dim=1), +) +torch.save(lor_torch_model, path) def _load_core_model(self, path: str) -> None: -self.torch_model = torch.load(path) +lor_torch_model = torch.load(path) +self.torch_model = lor_torch_model[0] def _get_extra_metadata(self) -> Dict[str, Any]: return { diff --git a/python/pyspark/mlv2/tests/test_classification.py b/python/pyspark/mlv2/tests/test_classification.py index 159862ef5f6..7f7d43b9cc8 100644 --- a/python/pyspark/mlv2/tests/test_classification.py +++ b/python/pyspark/mlv2/tests/test_classification.py @@ -167,10 +167,29 @@ class ClassificationTestsMixin: ) model = estimator.fit(training_dataset) +model_predictions = model.transform(eval_df1.toPandas()) + assert model.uid == estimator.uid local_model_path = os.path.join(tmp_dir, "model") model.saveToLocal(local_model_path) + +# test saved torch model can be loaded by pytorch solely +lor_torch_model = torch.load( +os.path.join(local_model_path, "LogisticRegressionModel.torch") +) + +with torch.inference_mode(): +torch_infer_result = lor_torch_model( +torch.tensor(np.stack(list(eval_df1.toPandas().features)), dtype=torch.float32) +).numpy() + +np.testing.assert_allclose( +np.stack(list(model_predictions.probability)), +torch_infer_result, +rtol=1e-4, +) + loaded_model = LORV2Model.loadFromLocal(local_model_path) assert loaded_model.numFeatures == 2 assert loaded_model.numClasses == 2 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-43981][PYTHON][ML] Basic saving / loading implementation for ML on spark connect
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a5d3bea04eb [SPARK-43981][PYTHON][ML] Basic saving / loading implementation for ML on spark connect a5d3bea04eb is described below commit a5d3bea04eb8430fd747905633b8164e21c95190 Author: Weichen Xu AuthorDate: Wed Jun 14 12:42:22 2023 +0800 [SPARK-43981][PYTHON][ML] Basic saving / loading implementation for ML on spark connect ### What changes were proposed in this pull request? * Base class / helper functions for saving/loading estimator / transformer / evaluator / model. * Add saving/loading implementation for feature transformers. * Add saving/loading implementation for logistic regression estimator. Design goals: * The model format is decoupled from spark, i.e. we can run model inference without spark service. * We can save model to either local file system or cloud storage file system. ### Why are the changes needed? We need to support saving/loading estimator / transformer / evaluator / model. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? Unit tests. Closes #41478 from WeichenXu123/mlv2-read-write. Authored-by: Weichen Xu Signed-off-by: Weichen Xu --- .../scala/org/apache/spark/ml/python/MLUtil.scala | 43 python/pyspark/ml/torch/distributor.py | 14 +- python/pyspark/ml/util.py | 13 ++ python/pyspark/mlv2/classification.py | 118 ++ python/pyspark/mlv2/evaluation.py | 3 +- python/pyspark/mlv2/feature.py | 133 --- python/pyspark/mlv2/io_utils.py| 242 + python/pyspark/mlv2/summarizer.py | 4 +- .../tests/connect/test_parity_classification.py| 6 +- python/pyspark/mlv2/tests/test_classification.py | 84 ++- python/pyspark/mlv2/tests/test_feature.py | 58 - 11 files changed, 634 insertions(+), 84 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/python/MLUtil.scala b/mllib/src/main/scala/org/apache/spark/ml/python/MLUtil.scala new file mode 100644 index 000..5e2b8943ed8 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/python/MLUtil.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.python + +import java.nio.file.Paths + +import org.apache.hadoop.fs.{Path => FSPath} + +import org.apache.spark.sql.SparkSession + + +object MLUtil { + + def copyFileFromLocalToFs(localPath: String, destPath: String): Unit = { +val sparkContext = SparkSession.getActiveSession.get.sparkContext + +val hadoopConf = sparkContext.hadoopConfiguration +assert( + Paths.get(destPath).isAbsolute, + "Destination path must be an absolute path on cloud storage." +) +val destFSPath = new FSPath(destPath) +val fs = destFSPath.getFileSystem(hadoopConf) + +fs.copyFromLocalFile(false, true, new FSPath(localPath.toString), destFSPath) + } + +} diff --git a/python/pyspark/ml/torch/distributor.py b/python/pyspark/ml/torch/distributor.py index be49dc147c0..2ed70854cc6 100644 --- a/python/pyspark/ml/torch/distributor.py +++ b/python/pyspark/ml/torch/distributor.py @@ -48,19 +48,7 @@ from pyspark.ml.torch.log_communication import ( # type: ignore LogStreamingClient, LogStreamingServer, ) - - -def _get_active_session(is_remote: bool) -> SparkSession: -if not is_remote: -spark = SparkSession.getActiveSession() -else: -import pyspark.sql.connect.session - -spark = pyspark.sql.connect.session._active_spark_session # type: ignore[assignment] - -if spark is None: -raise RuntimeError("An active SparkSession is required for the distributor.") -return spark +from pyspark.ml.util import _get_active_session def _ge
[spark] branch master updated (fead8a7962a -> 89de4f79e7f)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from fead8a7962a [SPARK-43993][SQL][TESTS] Add tests for cache artifacts add 89de4f79e7f [SPARK-43790][PYTHON][CONNECT][ML] Add `copyFromLocalToFs` API No new revisions were added by this update. Summary of changes: .../artifact/SparkConnectArtifactManager.scala | 39 ++ .../apache/spark/sql/connect/config/Connect.scala | 15 + .../service/SparkConnectAddArtifactsHandler.scala | 7 +++- .../connect/artifact/ArtifactManagerSuite.scala| 18 +- python/pyspark/sql/connect/client/artifact.py | 33 +++--- python/pyspark/sql/connect/client/core.py | 3 ++ python/pyspark/sql/connect/session.py | 29 .../sql/tests/connect/client/test_artifact.py | 21 8 files changed, 158 insertions(+), 7 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-43097][ML] New pyspark ML logistic regression estimator implemented on top of distributor
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2a82b42bcb1 [SPARK-43097][ML] New pyspark ML logistic regression estimator implemented on top of distributor 2a82b42bcb1 is described below commit 2a82b42bcb100fade622d3d08ef5d316425d3e5a Author: Weichen Xu AuthorDate: Tue Jun 6 21:53:46 2023 +0800 [SPARK-43097][ML] New pyspark ML logistic regression estimator implemented on top of distributor ### What changes were proposed in this pull request? This PR takes over https://github.com/apache/spark/pull/40748 Closes https://github.com/apache/spark/pull/40748 ### Why are the changes needed? Distributed ML on spark connect project. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? Unit tests. Manually testing code: Start local pyspark shell by: ``` bin/pyspark --remote "local[2]"# spark connect mode ``` or ``` bin/pyspark --master "local[2]" # legacy mode ``` to launch pyspark shell, Then, paste following code to pyspark shell: ``` from pyspark.mlv2.classification import LogisticRegression as LORV2 lorv2 = LORV2(maxIter=2, numTrainWorkers=2) from pyspark.ml.linalg import Vectors df = spark.createDataFrame( [ (1.0, Vectors.dense(0.0, 5.0)), (0.0, Vectors.dense(1.0, 2.0)), (1.0, Vectors.dense(2.0, 1.0)), (0.0, Vectors.dense(3.0, 3.0)), ] * 100, ["label", "features"], ) model.transform(df).show(truncate=False) model.transform(df.toPandas()) model.set(model.probabilityCol, "") model.transform(df).show(truncate=False) model.transform(df.toPandas()) ``` Closes #41383 from WeichenXu123/lor-torch-2. Authored-by: Weichen Xu Signed-off-by: Weichen Xu --- dev/sparktestsupport/modules.py| 2 + python/pyspark/ml/param/_shared_params_code_gen.py | 24 ++ python/pyspark/ml/param/shared.py | 89 ++ python/pyspark/ml/torch/data.py| 13 +- python/pyspark/ml/torch/distributor.py | 13 +- python/pyspark/mlv2/base.py| 75 - python/pyspark/mlv2/classification.py | 306 + .../tests/connect/test_parity_classification.py| 41 +++ python/pyspark/mlv2/tests/test_classification.py | 139 ++ 9 files changed, 696 insertions(+), 6 deletions(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index aa755033aa4..ecc471fd700 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -609,6 +609,7 @@ pyspark_ml = Module( "pyspark.mlv2.tests.test_summarizer", "pyspark.mlv2.tests.test_evaluation", "pyspark.mlv2.tests.test_feature", +"pyspark.mlv2.tests.test_classification", ], excluded_python_implementations=[ "PyPy" # Skip these tests under PyPy since they require numpy and it isn't available there @@ -823,6 +824,7 @@ pyspark_connect = Module( "pyspark.mlv2.tests.connect.test_parity_summarizer", "pyspark.mlv2.tests.connect.test_parity_evaluation", "pyspark.mlv2.tests.connect.test_parity_feature", +"pyspark.mlv2.tests.connect.test_parity_classification", ], excluded_python_implementations=[ "PyPy" # Skip these tests under PyPy since they require numpy, pandas, and pyarrow and diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py index 5df1782084a..2bec3a5053f 100644 --- a/python/pyspark/ml/param/_shared_params_code_gen.py +++ b/python/pyspark/ml/param/_shared_params_code_gen.py @@ -332,6 +332,30 @@ if __name__ == "__main__": "0.0", "TypeConverters.toFloat", ), +( +"numTrainWorkers", +"number of training workers", +"1", +"TypeConverters.toInt", +), +( +"batchSize", +"number of training batch size", +None, +"TypeConverters.toInt", +), +( +"learningRate", +"learning rate for training", +None, +"TypeConverters.toFloat", +), +( +"momentum", +"momentum for training op
[spark] branch master updated (51a919ea8d6 -> 1df1d7661a3)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 51a919ea8d6 [SPARK-43973][SS][UI] Structured Streaming UI should display failed queries correctly add 1df1d7661a3 [SPARK-43516][ML][PYTHON] Update MLv2 Transformer interfaces No new revisions were added by this update. Summary of changes: python/pyspark/mlv2/base.py | 24 - python/pyspark/mlv2/feature.py| 16 +++--- python/pyspark/mlv2/tests/test_feature.py | 6 ++ python/pyspark/mlv2/util.py | 35 --- 4 files changed, 51 insertions(+), 30 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (c618dabc96a -> fc3489d8bb6)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from c618dabc96a Revert "[SPARK-43911][SQL] Use toSet to deduplicate the iterator data to prevent the creation of large Array" add fc3489d8bb6 [SPARK-43783][SPARK-43784][SPARK-43788][ML] Make MLv2 (ML on spark connect) supports pandas >= 2.0 No new revisions were added by this update. Summary of changes: python/pyspark/mlv2/summarizer.py| 2 +- python/pyspark/mlv2/tests/test_feature.py| 10 -- python/pyspark/mlv2/tests/test_summarizer.py | 6 -- 3 files changed, 1 insertion(+), 17 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-43516][ML][FOLLOW-UP] Drop vector type support in Distributed ML for spark connect
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c3b62708cd6 [SPARK-43516][ML][FOLLOW-UP] Drop vector type support in Distributed ML for spark connect c3b62708cd6 is described below commit c3b62708cd6371da08943e572a9bc0a45c1dcced Author: Weichen Xu AuthorDate: Fri Jun 2 20:28:01 2023 +0800 [SPARK-43516][ML][FOLLOW-UP] Drop vector type support in Distributed ML for spark connect ### What changes were proposed in this pull request? Drop vector type support in Distributed ML for spark connect. ### Why are the changes needed? Distributed ML is designed for supporting fitting / transforming over either spark dataframe or local pandas dataframe. Currently pandas dataframe does not have vector type similar to `spark.ml.linalg.Vector`, and Vector type does not have too much advantages except saving sparse features dataset. To make the interface consistent, we decided initial version does not support vector type. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT. Closes #41420 from WeichenXu123/mlv2-drop-vector-type-support. Authored-by: Weichen Xu Signed-off-by: Weichen Xu --- python/pyspark/mlv2/base.py | 6 +- python/pyspark/mlv2/tests/test_feature.py| 16 +++- python/pyspark/mlv2/tests/test_summarizer.py | 10 -- python/pyspark/mlv2/util.py | 19 --- 4 files changed, 16 insertions(+), 35 deletions(-) diff --git a/python/pyspark/mlv2/base.py b/python/pyspark/mlv2/base.py index dc503db71c0..63631eccf2f 100644 --- a/python/pyspark/mlv2/base.py +++ b/python/pyspark/mlv2/base.py @@ -134,7 +134,11 @@ class Transformer(Params, metaclass=ABCMeta): def _get_transform_fn(self) -> Callable[["pd.Series"], Any]: """ Return a transformation function that accepts an instance of `pd.Series` as input and -returns transformed result as an instance of `pd.Series` or `pd.DataFrame` +returns transformed result as an instance of `pd.Series` or `pd.DataFrame`. +If there's only one output column, the transformed result must be an +instance of `pd.Series`, if there are multiple output columns, the transformed result +must be an instance of `pd.DataFrame` with column names matching output schema +returned by `_output_columns` interface. """ raise NotImplementedError() diff --git a/python/pyspark/mlv2/tests/test_feature.py b/python/pyspark/mlv2/tests/test_feature.py index 8bc9d4c2307..eed04217a6f 100644 --- a/python/pyspark/mlv2/tests/test_feature.py +++ b/python/pyspark/mlv2/tests/test_feature.py @@ -21,8 +21,6 @@ from distutils.version import LooseVersion import numpy as np import pandas as pd -from pyspark.ml.functions import vector_to_array -from pyspark.ml.linalg import Vectors from pyspark.mlv2.feature import MaxAbsScaler, StandardScaler from pyspark.sql import SparkSession @@ -35,8 +33,8 @@ class FeatureTestsMixin: def test_max_abs_scaler(self): df1 = self.spark.createDataFrame( [ -(Vectors.dense([2.0, 3.5, 1.5]),), -(Vectors.dense([-3.0, -0.5, -2.5]),), +([2.0, 3.5, 1.5],), +([-3.0, -0.5, -2.5],), ], schema=["features"], ) @@ -49,7 +47,7 @@ class FeatureTestsMixin: np.testing.assert_allclose(list(result.scaled_features), expected_result) -local_df1 = df1.withColumn("features", vector_to_array("features")).toPandas() +local_df1 = df1.toPandas() local_fit_model = scaler.fit(local_df1) local_transform_result = local_fit_model.transform(local_df1) @@ -62,9 +60,9 @@ class FeatureTestsMixin: def test_standard_scaler(self): df1 = self.spark.createDataFrame( [ -(Vectors.dense([2.0, 3.5, 1.5]),), -(Vectors.dense([-3.0, -0.5, -2.5]),), -(Vectors.dense([1.0, -1.5, 0.5]),), +([2.0, 3.5, 1.5],), +([-3.0, -0.5, -2.5],), +([1.0, -1.5, 0.5],), ], schema=["features"], ) @@ -81,7 +79,7 @@ class FeatureTestsMixin: np.testing.assert_allclose(list(result.scaled_features), expected_result) -local_df1 = df1.withColumn("features", vector_to_array("features")).toPandas() +local_df1 = df1.toPandas() local_fit_model = scaler.fit(local_df1) local_transform_result = local_fit_model.transform(local_df1) diff --git a/python/
[spark] branch master updated: [SPARK-41593][FOLLOW-UP][ML] Torch distributor log streaming server: Avoid duplicated log to stdout redirection
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 480b14f4b45 [SPARK-41593][FOLLOW-UP][ML] Torch distributor log streaming server: Avoid duplicated log to stdout redirection 480b14f4b45 is described below commit 480b14f4b45a4e01e7e4bdea82475d4c77a6b89f Author: Weichen Xu AuthorDate: Thu Jun 1 16:26:49 2023 +0800 [SPARK-41593][FOLLOW-UP][ML] Torch distributor log streaming server: Avoid duplicated log to stdout redirection ### What changes were proposed in this pull request? Torch distributor log streaming server: Avoid duplicated log to stdout redirection. In some cases (typically spark local mode), the remote tasks runs on the same node with spark driver, in this case, both torch process created by spark task and driver side torch distributor log streaming server redirect logs to STDOUT, then it causes STDOUT prints duplicate logs. This PR fixes the case. ### Why are the changes needed? In some cases (typically spark local mode), the remote tasks runs on the same node with spark driver, in this case, both torch process created by spark task and driver side torch distributor log streaming server redirect logs to STDOUT, then it causes STDOUT prints duplicate logs. This PR fixes the case. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT. Closes #41404 from WeichenXu123/torch-distributor-avoid-dup-log. Authored-by: Weichen Xu Signed-off-by: Weichen Xu --- python/pyspark/ml/torch/distributor.py | 17 - 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/python/pyspark/ml/torch/distributor.py b/python/pyspark/ml/torch/distributor.py index 711f76db09b..2af92f8399f 100644 --- a/python/pyspark/ml/torch/distributor.py +++ b/python/pyspark/ml/torch/distributor.py @@ -462,7 +462,22 @@ class TorchDistributor(Distributor): decoded = line.decode() tail.append(decoded) if redirect_to_stdout: -sys.stdout.write(decoded) +if ( +log_streaming_client +and not log_streaming_client.failed +and ( +log_streaming_client.sock.getsockname()[0] +== log_streaming_client.sock.getpeername()[0] +) +): +# If log_streaming_client and log_stream_server are in the same +# node (typical case is spark local mode), +# server side will redirect the log to STDOUT, +# to avoid STDOUT outputs duplication, skip redirecting +# logs to STDOUT in client side. +pass +else: +sys.stdout.write(decoded) if log_streaming_client: log_streaming_client.send(decoded.rstrip()) task.wait() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-43081][ML][FOLLOW-UP] Improve torch distributor data loader code
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c2060e7c0a3 [SPARK-43081][ML][FOLLOW-UP] Improve torch distributor data loader code c2060e7c0a3 is described below commit c2060e7c0a332c20f527adeb34a52042237430e4 Author: Weichen Xu AuthorDate: Wed May 31 16:34:19 2023 +0800 [SPARK-43081][ML][FOLLOW-UP] Improve torch distributor data loader code ### What changes were proposed in this pull request? ### Why are the changes needed? Improve torch distributor data loader code: * Add a verification that num_processes must match input spark dataframe partitions. This makes user debug easier when they set mismatched input dataframe, otherwise torch package will raise intricate error information. * Improve column value conversion in torch dataloader. Avoid comparing type operation for every column values. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT. Closes #41382 from WeichenXu123/improve-torch-dataloader. Authored-by: Weichen Xu Signed-off-by: Weichen Xu --- python/pyspark/ml/torch/data.py| 50 -- python/pyspark/ml/torch/distributor.py | 6 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/python/pyspark/ml/torch/data.py b/python/pyspark/ml/torch/data.py index a52b96a9392..d421683c16d 100644 --- a/python/pyspark/ml/torch/data.py +++ b/python/pyspark/ml/torch/data.py @@ -17,7 +17,7 @@ import torch import numpy as np -from typing import Any, Iterator +from typing import Any, Callable, Iterator from pyspark.sql.types import StructType @@ -26,27 +26,37 @@ class _SparkPartitionTorchDataset(torch.utils.data.IterableDataset): self.arrow_file_path = arrow_file_path self.num_samples = num_samples self.field_types = [field.dataType.simpleString() for field in schema] +self.field_converters = [ +_SparkPartitionTorchDataset._get_field_converter(field_type) +for field_type in self.field_types +] @staticmethod -def _extract_field_value(value: Any, field_type: str) -> Any: -# TODO: avoid checking field type for every row. +def _get_field_converter(field_type: str) -> Callable[[Any], Any]: if field_type == "vector": -if value["type"] == 1: -# dense vector -return value["values"] -if value["type"] == 0: -# sparse vector -size = int(value["size"]) -sparse_array = np.zeros(size, dtype=np.float64) -sparse_array[value["indices"]] = value["values"] -return sparse_array -if field_type in ["float", "double", "int", "bigint", "smallint"]: -return value -raise ValueError( -"SparkPartitionTorchDataset does not support loading data from field of " -f"type {field_type}." -) +def converter(value: Any) -> Any: +if value["type"] == 1: +# dense vector +return value["values"] +if value["type"] == 0: +# sparse vector +size = int(value["size"]) +sparse_array = np.zeros(size, dtype=np.float64) +sparse_array[value["indices"]] = value["values"] +return sparse_array + +elif field_type in ["float", "double", "int", "bigint", "smallint"]: + +def converter(value: Any) -> Any: +return value + +else: +raise ValueError( +"SparkPartitionTorchDataset does not support loading data from field of " +f"type {field_type}." +) +return converter def __iter__(self) -> Iterator[Any]: from pyspark.sql.pandas.serializers import ArrowStreamSerializer @@ -71,8 +81,8 @@ class _SparkPartitionTorchDataset(torch.utils.data.IterableDataset): batch_pdf = batch.to_pandas() for row in batch_pdf.itertuples(index=False): yield [ - _SparkPartitionTorchDataset._extract_field_value(value, field_type) -for value, field_type in zip(row, self.field_types) +field_converter(value) +for value, field_converter
[spark] branch master updated: [SPARK-41593][FOLLOW-UP] Fix the case torch distributor logging server not shut down
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e1619653895 [SPARK-41593][FOLLOW-UP] Fix the case torch distributor logging server not shut down e1619653895 is described below commit e1619653895b4d5e11d7121bdb7906355d8c17bf Author: Weichen Xu AuthorDate: Tue May 30 19:13:20 2023 +0800 [SPARK-41593][FOLLOW-UP] Fix the case torch distributor logging server not shut down ### What changes were proposed in this pull request? Fix the case torch distributor logging server not shut down. The `_get_spark_task_function` and `_check_encryption` might raise exception, in this case, the logging server must be shut down but it is not shut down. This PR fixes the case. ### Why are the changes needed? Fix the case torch distributor logging server not shut down ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests. Closes #41375 from WeichenXu123/improve-torch-distributor-log-server-exception-handling. Authored-by: Weichen Xu Signed-off-by: Weichen Xu --- python/pyspark/ml/torch/distributor.py | 26 +- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/python/pyspark/ml/torch/distributor.py b/python/pyspark/ml/torch/distributor.py index ad8b4d8cc25..0249e6b4b2c 100644 --- a/python/pyspark/ml/torch/distributor.py +++ b/python/pyspark/ml/torch/distributor.py @@ -665,20 +665,20 @@ class TorchDistributor(Distributor): time.sleep(1) # wait for the server to start self.log_streaming_server_port = log_streaming_server.port -spark_task_function = self._get_spark_task_function( -framework_wrapper_fn, train_object, spark_dataframe, *args, **kwargs -) -self._check_encryption() -self.logger.info( -f"Started distributed training with {self.num_processes} executor processes" -) -if spark_dataframe is not None: -input_df = spark_dataframe -else: -input_df = self.spark.range( -start=0, end=self.num_tasks, step=1, numPartitions=self.num_tasks -) try: +spark_task_function = self._get_spark_task_function( +framework_wrapper_fn, train_object, spark_dataframe, *args, **kwargs +) +self._check_encryption() +self.logger.info( +f"Started distributed training with {self.num_processes} executor processes" +) +if spark_dataframe is not None: +input_df = spark_dataframe +else: +input_df = self.spark.range( +start=0, end=self.num_tasks, step=1, numPartitions=self.num_tasks +) rows = input_df.mapInArrow( func=spark_task_function, schema="chunk binary", barrier=True ).collect() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (7c7b9585a2a -> 0e8e4ae47fb)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 7c7b9585a2a [SPARK-43546][PYTHON][CONNECT][TESTS] Complete parity tests of Pandas UDF add 0e8e4ae47fb [SPARK-43516][ML][PYTHON][CONNECT] Base interfaces of sparkML for spark3.5: estimator/transformer/model/evaluator No new revisions were added by this update. Summary of changes: dev/infra/Dockerfile | 2 +- dev/requirements.txt | 2 + dev/sparktestsupport/modules.py| 6 + python/mypy.ini| 3 + python/pyspark/mlv2/__init__.py| 36 python/pyspark/mlv2/base.py| 240 + python/pyspark/mlv2/evaluation.py | 88 python/pyspark/mlv2/feature.py | 124 +++ python/pyspark/mlv2/summarizer.py | 118 ++ .../mlv2/tests/connect/test_parity_evaluation.py | 47 .../mlv2/tests/connect/test_parity_feature.py | 40 .../mlv2/tests/connect/test_parity_summarizer.py | 40 python/pyspark/mlv2/tests/test_evaluation.py | 88 python/pyspark/mlv2/tests/test_feature.py | 98 + python/pyspark/mlv2/tests/test_summarizer.py | 80 +++ python/pyspark/mlv2/util.py| 192 + 16 files changed, 1203 insertions(+), 1 deletion(-) create mode 100644 python/pyspark/mlv2/__init__.py create mode 100644 python/pyspark/mlv2/base.py create mode 100644 python/pyspark/mlv2/evaluation.py create mode 100644 python/pyspark/mlv2/feature.py create mode 100644 python/pyspark/mlv2/summarizer.py create mode 100644 python/pyspark/mlv2/tests/connect/test_parity_evaluation.py create mode 100644 python/pyspark/mlv2/tests/connect/test_parity_feature.py create mode 100644 python/pyspark/mlv2/tests/connect/test_parity_summarizer.py create mode 100644 python/pyspark/mlv2/tests/test_evaluation.py create mode 100644 python/pyspark/mlv2/tests/test_feature.py create mode 100644 python/pyspark/mlv2/tests/test_summarizer.py create mode 100644 python/pyspark/mlv2/util.py - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (aed6a47580e -> abd864766b0)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from aed6a47580e [SPARK-43320][SQL][HIVE] Directly call Hive 2.3.9 API add abd864766b0 [SPARK-43081][ML][CONNECT] Add torch distributor data loader that loads data from spark partition data No new revisions were added by this update. Summary of changes: dev/sparktestsupport/modules.py| 2 + .../connect/test_parity_torch_data_loader.py} | 26 ++- python/pyspark/ml/torch/data.py| 79 +++ python/pyspark/ml/torch/distributor.py | 259 ++--- python/pyspark/ml/torch/tests/test_data_loader.py | 136 +++ python/pyspark/ml/torch/tests/test_distributor.py | 2 +- 6 files changed, 461 insertions(+), 43 deletions(-) copy python/pyspark/{pandas/tests/connect/test_parity_dataframe_spark_io.py => ml/tests/connect/test_parity_torch_data_loader.py} (61%) create mode 100644 python/pyspark/ml/torch/data.py create mode 100644 python/pyspark/ml/torch/tests/test_data_loader.py - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-42929] make mapInPandas / mapInArrow support "is_barrier"
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2a1ac07132b [SPARK-42929] make mapInPandas / mapInArrow support "is_barrier" 2a1ac07132b is described below commit 2a1ac07132b7abc13e56b9a632b3dece7e4b60ea Author: Weichen Xu AuthorDate: Mon Mar 27 17:50:23 2023 +0800 [SPARK-42929] make mapInPandas / mapInArrow support "is_barrier" ### What changes were proposed in this pull request? make mapInPandas / mapInArrow support "is_barrier" ### Why are the changes needed? feature parity. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? Manually: `bin/pyspark --remote local`: ``` from pyspark.sql.functions import pandas_udf df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age")) def filter_func(iterator): for pdf in iterator: yield pdf[pdf.id == 1] df.mapInPandas(filter_func, df.schema, is_barrier=True).collect() def filter_func(iterator): for batch in iterator: pdf = batch.to_pandas() yield pyarrow.RecordBatch.from_pandas(pdf[pdf.id == 1]) df.mapInArrow(filter_func, df.schema, is_barrier=True).collect() ``` Closes #40559 from WeichenXu123/spark-connect-barrier-mode. Authored-by: Weichen Xu Signed-off-by: Weichen Xu --- .../main/protobuf/spark/connect/relations.proto| 3 +++ .../sql/connect/planner/SparkConnectPlanner.scala | 5 ++-- python/pyspark/sql/connect/dataframe.py| 21 +++ python/pyspark/sql/connect/plan.py | 8 +- python/pyspark/sql/connect/proto/relations_pb2.py | 24 - python/pyspark/sql/connect/proto/relations_pb2.pyi | 31 -- 6 files changed, 70 insertions(+), 22 deletions(-) diff --git a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto index 976bd68e7fe..c965a6c8d32 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto @@ -794,6 +794,9 @@ message MapPartitions { // (Required) Input user-defined function. CommonInlineUserDefinedFunction func = 2; + + // (Optional) isBarrier. + optional bool is_barrier = 3; } message GroupMap { diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index e7911ccdf11..e7e88cab643 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -484,19 +484,20 @@ class SparkConnectPlanner(val session: SparkSession) { private def transformMapPartitions(rel: proto.MapPartitions): LogicalPlan = { val commonUdf = rel.getFunc val pythonUdf = transformPythonUDF(commonUdf) +val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false pythonUdf.evalType match { case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF => logical.MapInPandas( pythonUdf, pythonUdf.dataType.asInstanceOf[StructType].toAttributes, transformRelation(rel.getInput), - false) + isBarrier) case PythonEvalType.SQL_MAP_ARROW_ITER_UDF => logical.PythonMapInArrow( pythonUdf, pythonUdf.dataType.asInstanceOf[StructType].toAttributes, transformRelation(rel.getInput), - false) + isBarrier) case _ => throw InvalidPlanInput(s"Function with EvalType: ${pythonUdf.evalType} is not supported") } diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index 2dfc8e72193..10426c3c28d 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -1623,6 +1623,7 @@ class DataFrame: func: "PandasMapIterFunction", schema: Union[StructType, str], evalType: int, +is_barrier: bool, ) -> "DataFrame": from pyspark.sql.connect.udf import UserDefinedFunction @@ -1636,21 +1637,31 @@ class DataFrame: ) return DataFrame.withPlan( -plan.MapPartitions(child=self._plan, function=udf_obj, cols=self.columns), +plan.MapPartitions( +child=self._plan, function=udf_obj, cols=self.columns, is_barrier=is_b
[spark] branch master updated: [SPARK-42896][SQL][PYTHON] Make `mapInPandas` / `mapInArrow` support barrier mode execution
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 06bf544973f [SPARK-42896][SQL][PYTHON] Make `mapInPandas` / `mapInArrow` support barrier mode execution 06bf544973f is described below commit 06bf544973f4e221c569487473fbe3268543ebb7 Author: Weichen Xu AuthorDate: Mon Mar 27 09:39:48 2023 +0800 [SPARK-42896][SQL][PYTHON] Make `mapInPandas` / `mapInArrow` support barrier mode execution ### What changes were proposed in this pull request? Make mapInPandas / mapInArrow support barrier mode execution ### Why are the changes needed? This is the preparation PR for supporting mapInPandas / mapInArrow barrier execution in spark connect mode. The feature is required by machine learning use cases. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Closes #40520 from WeichenXu123/barrier-udf. Authored-by: Weichen Xu Signed-off-by: Weichen Xu --- .../sql/connect/planner/SparkConnectPlanner.scala | 6 +++-- python/pyspark/sql/pandas/map_ops.py | 26 ++ .../catalyst/analysis/DeduplicateRelations.scala | 4 ++-- .../plans/logical/pythonLogicalOperators.scala | 6 +++-- .../sql/catalyst/analysis/AnalysisSuite.scala | 3 ++- .../main/scala/org/apache/spark/sql/Dataset.scala | 10 + .../spark/sql/execution/SparkStrategies.scala | 8 +++ .../sql/execution/python/MapInBatchExec.scala | 10 - .../sql/execution/python/MapInPandasExec.scala | 3 ++- .../execution/python/PythonMapInArrowExec.scala| 3 ++- 10 files changed, 57 insertions(+), 22 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 13052ec9b01..e7911ccdf11 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -489,12 +489,14 @@ class SparkConnectPlanner(val session: SparkSession) { logical.MapInPandas( pythonUdf, pythonUdf.dataType.asInstanceOf[StructType].toAttributes, - transformRelation(rel.getInput)) + transformRelation(rel.getInput), + false) case PythonEvalType.SQL_MAP_ARROW_ITER_UDF => logical.PythonMapInArrow( pythonUdf, pythonUdf.dataType.asInstanceOf[StructType].toAttributes, - transformRelation(rel.getInput)) + transformRelation(rel.getInput), + false) case _ => throw InvalidPlanInput(s"Function with EvalType: ${pythonUdf.evalType} is not supported") } diff --git a/python/pyspark/sql/pandas/map_ops.py b/python/pyspark/sql/pandas/map_ops.py index 47b17578ae1..a4c0c94844b 100644 --- a/python/pyspark/sql/pandas/map_ops.py +++ b/python/pyspark/sql/pandas/map_ops.py @@ -32,7 +32,7 @@ class PandasMapOpsMixin: """ def mapInPandas( -self, func: "PandasMapIterFunction", schema: Union[StructType, str] +self, func: "PandasMapIterFunction", schema: Union[StructType, str], isBarrier: bool = False ) -> "DataFrame": """ Maps an iterator of batches in the current :class:`DataFrame` using a Python native @@ -60,6 +60,7 @@ class PandasMapOpsMixin: schema : :class:`pyspark.sql.types.DataType` or str the return type of the `func` in PySpark. The value can be either a :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string. +isBarrier : Use barrier mode execution if True. Examples @@ -74,6 +75,14 @@ class PandasMapOpsMixin: +---+---+ | 1| 21| +---+---+ +>>> # Set isBarrier=True to force the "mapInPandas" stage running in barrier mode, +>>> # it ensures all python UDF workers in the stage will be launched concurrently. +>>> df.mapInPandas(filter_func, df.schema, isBarrier=True).show() # doctest: +SKIP ++---+---+ +| id|age| ++---+---+ +| 1| 21| ++---+---+ Notes - @@ -93,11 +102,11 @@ class PandasMapOpsMixin: func, returnType=schema, functionType=PythonEvalType.SQL_MAP_PANDAS_ITER_UDF ) # type: ignore[call-overload] udf_column = udf(*[self[col] for col in self.columns]) -jdf = self._jdf.mapInPandas(udf_column._jc.expr()) +jdf =
[spark] branch master updated: [SPARK-42732][PYSPARK][CONNECT] Support spark connect session getActiveSession method
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 753864fedee [SPARK-42732][PYSPARK][CONNECT] Support spark connect session getActiveSession method 753864fedee is described below commit 753864fedee62f638354040063d95b2b3ba93d46 Author: Weichen Xu AuthorDate: Tue Mar 14 18:33:24 2023 +0800 [SPARK-42732][PYSPARK][CONNECT] Support spark connect session getActiveSession method ### What changes were proposed in this pull request? Support spark connect session getActiveSession method. Spark connect ML needs this API to get active session in some cases (e.g. fetching model attributes from server side). ### Why are the changes needed? Manually. ### Does this PR introduce _any_ user-facing change? Yes. Implemented `pyspark.sql.connect.session.SparkSession.getActiveSession` API. ### How was this patch tested? N/A Closes #40353 from WeichenXu123/spark-connect-get-active-session. Authored-by: Weichen Xu Signed-off-by: Weichen Xu --- python/pyspark/sql/connect/session.py | 13 - 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/connect/session.py b/python/pyspark/sql/connect/session.py index 5e7c8361d80..ffa139eba3e 100644 --- a/python/pyspark/sql/connect/session.py +++ b/python/pyspark/sql/connect/session.py @@ -74,6 +74,11 @@ if TYPE_CHECKING: from pyspark.sql.connect.udf import UDFRegistration +# `_active_spark_session` stores the active spark connect session created by +# `SparkSession.builder.getOrCreate`. It is used by ML code. +_active_spark_session = None + + class SparkSession: class Builder: """Builder for :class:`SparkSession`.""" @@ -119,7 +124,11 @@ class SparkSession: raise NotImplementedError("enableHiveSupport not implemented for Spark Connect") def getOrCreate(self) -> "SparkSession": -return SparkSession(connectionString=self._options["spark.remote"]) +global _active_spark_session +if _active_spark_session is not None: +return _active_spark_session +_active_spark_session = SparkSession(connectionString=self._options["spark.remote"]) +return _active_spark_session _client: SparkConnectClient @@ -434,7 +443,9 @@ class SparkSession: # specifically in Spark Connect the Spark Connect server is designed for # multi-tenancy - the remote client side cannot just stop the server and stop # other remote clients being used from other users. +global _active_spark_session self.client.close() +_active_spark_session = None if "SPARK_LOCAL_REMOTE" in os.environ: # When local mode is in use, follow the regular Spark session's - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (b414b895ffd -> d5b08f8d99b)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from b414b895ffd [SPARK-41994] Assign SQLSTATE's (1/2) add d5b08f8d99b [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions No new revisions were added by this update. Summary of changes: dev/sparktestsupport/modules.py| 2 + python/pyspark/ml/functions.py | 637 + python/pyspark/ml/model_cache.py | 55 ++ python/pyspark/ml/tests/test_functions.py | 501 .../tests/test_model_cache.py} | 35 +- 5 files changed, 1218 insertions(+), 12 deletions(-) create mode 100644 python/pyspark/ml/model_cache.py create mode 100644 python/pyspark/ml/tests/test_functions.py copy python/pyspark/{sql/tests/connect/test_parity_datasources.py => ml/tests/test_model_cache.py} (54%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41949][CORE][PYTHON] Make stage scheduling support local-cluster mode
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8871f6dfb82 [SPARK-41949][CORE][PYTHON] Make stage scheduling support local-cluster mode 8871f6dfb82 is described below commit 8871f6dfb82acd7f44dfa19fc24c877abe3a2fe3 Author: Weichen Xu AuthorDate: Tue Jan 10 21:05:44 2023 +0800 [SPARK-41949][CORE][PYTHON] Make stage scheduling support local-cluster mode Signed-off-by: Weichen Xu ### What changes were proposed in this pull request? Make stage scheduling support local-cluster mode. ### Why are the changes needed? This is useful in testing, especially for test code of third-party python libraries that depends on pyspark, many tests are written with pytest, but pytest is hard to integrate with a standalone spark cluster. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit tests Closes #39424 from WeichenXu123/stage-sched-local-cluster. Authored-by: Weichen Xu Signed-off-by: Weichen Xu --- .../spark/resource/ResourceProfileManager.scala| 12 +- dev/sparktestsupport/modules.py| 1 + python/pyspark/tests/test_stage_sched.py | 153 + 3 files changed, 161 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala index 3f48aaded5c..9f98d4d9c9c 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala @@ -54,7 +54,9 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf, private val master = sparkConf.getOption("spark.master") private val isYarn = master.isDefined && master.get.equals("yarn") private val isK8s = master.isDefined && master.get.startsWith("k8s://") - private val isStandalone = master.isDefined && master.get.startsWith("spark://") + private val isStandaloneOrLocalCluster = master.isDefined && ( + master.get.startsWith("spark://") || master.get.startsWith("local-cluster") +) private val notRunningUnitTests = !isTesting private val testExceptionThrown = sparkConf.get(RESOURCE_PROFILE_MANAGER_TESTING) @@ -65,16 +67,16 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf, */ private[spark] def isSupported(rp: ResourceProfile): Boolean = { if (rp.isInstanceOf[TaskResourceProfile] && !dynamicEnabled) { - if ((notRunningUnitTests || testExceptionThrown) && !isStandalone) { + if ((notRunningUnitTests || testExceptionThrown) && !isStandaloneOrLocalCluster) { throw new SparkException("TaskResourceProfiles are only supported for Standalone " + "cluster for now when dynamic allocation is disabled.") } } else { val isNotDefaultProfile = rp.id != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID val notYarnOrK8sOrStandaloneAndNotDefaultProfile = -isNotDefaultProfile && !(isYarn || isK8s || isStandalone) +isNotDefaultProfile && !(isYarn || isK8s || isStandaloneOrLocalCluster) val YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile = -isNotDefaultProfile && (isYarn || isK8s || isStandalone) && !dynamicEnabled +isNotDefaultProfile && (isYarn || isK8s || isStandaloneOrLocalCluster) && !dynamicEnabled // We want the exception to be thrown only when we are specifically testing for the // exception or in a real application. Otherwise in all other testing scenarios we want @@ -86,7 +88,7 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf, "and Standalone with dynamic allocation enabled.") } - if (isStandalone && dynamicEnabled && rp.getExecutorCores.isEmpty && + if (isStandaloneOrLocalCluster && dynamicEnabled && rp.getExecutorCores.isEmpty && sparkConf.getOption(config.EXECUTOR_CORES.key).isEmpty) { logWarning("Neither executor cores is set for resource profile, nor spark.executor.cores " + "is explicitly set, you may get more executors allocated than expected. " + diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 655c539cfae..5df495096b7 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -430,6 +430,7 @@ pyspark_core = Module( "pyspark.tests.test_taskcontext&q
[spark] branch branch-3.2 updated: [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 3b9cca7aa32 [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes 3b9cca7aa32 is described below commit 3b9cca7aa32f659ea7413abeb373fe7ed069e6f7 Author: Weichen Xu AuthorDate: Sat Nov 19 17:23:20 2022 +0800 [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes Signed-off-by: Weichen Xu ### What changes were proposed in this pull request? Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes. ### Why are the changes needed? This is for limiting the thread number for OpenBLAS routine to the number of cores assigned to this executor because some spark ML algorithms calls OpenBlAS via netlib-java, e.g.: Spark ALS estimator training calls LAPACK API `dppsv` (internally it will call BLAS lib), if it calls OpenBLAS lib, by default OpenBLAS will try to use all CPU cores. But spark will launch multiple spark tasks on a spark worker, and each spark task might call `dppsv` API at the same time, and each call internally it will create multiple threads (threads number equals to CPU cores), this causes CPU oversubscription. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually. Closes #38699 from WeichenXu123/SPARK-41188. Authored-by: Weichen Xu Signed-off-by: Weichen Xu (cherry picked from commit 82a41d8ca273e7a9268324c6958f8bb14d9e) Signed-off-by: Weichen Xu --- core/src/main/scala/org/apache/spark/SparkContext.scala| 10 ++ .../main/scala/org/apache/spark/api/python/PythonRunner.scala | 7 --- .../scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala | 10 ++ 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f7d8c799029..f991d2ea09c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -542,6 +542,16 @@ class SparkContext(config: SparkConf) extends Logging { executorEnvs ++= _conf.getExecutorEnv executorEnvs("SPARK_USER") = sparkUser +if (_conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) { + // if OMP_NUM_THREADS is not explicitly set, override it with the value of "spark.task.cpus" + // SPARK-41188: limit the thread number for OpenBLAS routine to the number of cores assigned + // to this executor because some spark ML algorithms calls OpenBlAS via netlib-java + // SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor + // this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool + // see https://github.com/numpy/numpy/issues/10455 + executorEnvs.put("OMP_NUM_THREADS", _conf.get("spark.task.cpus", "1")) +} + _shuffleDriverComponents = ShuffleDataIOUtils.loadShuffleDataIO(config).driver() _shuffleDriverComponents.initializeApplication().asScala.foreach { case (k, v) => _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 3a3e7e04e7f..d854874c0e8 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -131,13 +131,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( val execCoresProp = Option(context.getLocalProperty(EXECUTOR_CORES_LOCAL_PROPERTY)) val memoryMb = Option(context.getLocalProperty(PYSPARK_MEMORY_LOCAL_PROPERTY)).map(_.toLong) val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") -// if OMP_NUM_THREADS is not explicitly set, override it with the number of cores -if (conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) { - // SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor - // this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool - // see https://github.com/numpy/numpy/issues/10455 - execCoresProp.foreach(envVars.put("OMP_NUM_THREADS", _)) -} envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor thread
[spark] branch branch-3.3 updated: [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new f431cdf0944 [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes f431cdf0944 is described below commit f431cdf09442b86133d17fa6e56cb8b77ca4e486 Author: Weichen Xu AuthorDate: Sat Nov 19 17:23:20 2022 +0800 [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes Signed-off-by: Weichen Xu ### What changes were proposed in this pull request? Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes. ### Why are the changes needed? This is for limiting the thread number for OpenBLAS routine to the number of cores assigned to this executor because some spark ML algorithms calls OpenBlAS via netlib-java, e.g.: Spark ALS estimator training calls LAPACK API `dppsv` (internally it will call BLAS lib), if it calls OpenBLAS lib, by default OpenBLAS will try to use all CPU cores. But spark will launch multiple spark tasks on a spark worker, and each spark task might call `dppsv` API at the same time, and each call internally it will create multiple threads (threads number equals to CPU cores), this causes CPU oversubscription. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually. Closes #38699 from WeichenXu123/SPARK-41188. Authored-by: Weichen Xu Signed-off-by: Weichen Xu (cherry picked from commit 82a41d8ca273e7a9268324c6958f8bb14d9e) Signed-off-by: Weichen Xu --- core/src/main/scala/org/apache/spark/SparkContext.scala| 10 ++ .../main/scala/org/apache/spark/api/python/PythonRunner.scala | 7 --- .../scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala | 10 ++ 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 02c58d2a9b4..0d0d4fe83a4 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -546,6 +546,16 @@ class SparkContext(config: SparkConf) extends Logging { executorEnvs ++= _conf.getExecutorEnv executorEnvs("SPARK_USER") = sparkUser +if (_conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) { + // if OMP_NUM_THREADS is not explicitly set, override it with the value of "spark.task.cpus" + // SPARK-41188: limit the thread number for OpenBLAS routine to the number of cores assigned + // to this executor because some spark ML algorithms calls OpenBlAS via netlib-java + // SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor + // this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool + // see https://github.com/numpy/numpy/issues/10455 + executorEnvs.put("OMP_NUM_THREADS", _conf.get("spark.task.cpus", "1")) +} + _shuffleDriverComponents = ShuffleDataIOUtils.loadShuffleDataIO(config).driver() _shuffleDriverComponents.initializeApplication().asScala.foreach { case (k, v) => _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index f32c80f3ef5..bf5b862438a 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -133,13 +133,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( val execCoresProp = Option(context.getLocalProperty(EXECUTOR_CORES_LOCAL_PROPERTY)) val memoryMb = Option(context.getLocalProperty(PYSPARK_MEMORY_LOCAL_PROPERTY)).map(_.toLong) val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") -// if OMP_NUM_THREADS is not explicitly set, override it with the number of cores -if (conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) { - // SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor - // this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool - // see https://github.com/numpy/numpy/issues/10455 - execCoresProp.foreach(envVars.put("OMP_NUM_THREADS", _)) -} envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor thread
[spark] branch master updated: [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 82a41d8ca27 [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes 82a41d8ca27 is described below commit 82a41d8ca273e7a9268324c6958f8bb14d9e Author: Weichen Xu AuthorDate: Sat Nov 19 17:23:20 2022 +0800 [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes Signed-off-by: Weichen Xu ### What changes were proposed in this pull request? Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes. ### Why are the changes needed? This is for limiting the thread number for OpenBLAS routine to the number of cores assigned to this executor because some spark ML algorithms calls OpenBlAS via netlib-java, e.g.: Spark ALS estimator training calls LAPACK API `dppsv` (internally it will call BLAS lib), if it calls OpenBLAS lib, by default OpenBLAS will try to use all CPU cores. But spark will launch multiple spark tasks on a spark worker, and each spark task might call `dppsv` API at the same time, and each call internally it will create multiple threads (threads number equals to CPU cores), this causes CPU oversubscription. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually. Closes #38699 from WeichenXu123/SPARK-41188. Authored-by: Weichen Xu Signed-off-by: Weichen Xu --- core/src/main/scala/org/apache/spark/SparkContext.scala| 10 ++ .../main/scala/org/apache/spark/api/python/PythonRunner.scala | 7 --- .../scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala | 10 ++ 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d2c7067e596..5cbf2e83371 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -541,6 +541,16 @@ class SparkContext(config: SparkConf) extends Logging { executorEnvs ++= _conf.getExecutorEnv executorEnvs("SPARK_USER") = sparkUser +if (_conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) { + // if OMP_NUM_THREADS is not explicitly set, override it with the value of "spark.task.cpus" + // SPARK-41188: limit the thread number for OpenBLAS routine to the number of cores assigned + // to this executor because some spark ML algorithms calls OpenBlAS via netlib-java + // SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor + // this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool + // see https://github.com/numpy/numpy/issues/10455 + executorEnvs.put("OMP_NUM_THREADS", _conf.get("spark.task.cpus", "1")) +} + _shuffleDriverComponents = ShuffleDataIOUtils.loadShuffleDataIO(config).driver() _shuffleDriverComponents.initializeApplication().asScala.foreach { case (k, v) => _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 14d5df14ed8..cdb2c620656 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -135,13 +135,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( val execCoresProp = Option(context.getLocalProperty(EXECUTOR_CORES_LOCAL_PROPERTY)) val memoryMb = Option(context.getLocalProperty(PYSPARK_MEMORY_LOCAL_PROPERTY)).map(_.toLong) val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") -// if OMP_NUM_THREADS is not explicitly set, override it with the number of cores -if (conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) { - // SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor - // this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool - // see https://github.com/numpy/numpy/issues/10455 - execCoresProp.foreach(envVars.put("OMP_NUM_THREADS", _)) -} envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor thread if (reuseWorker) { envVars.put("SPARK_REUSE_WORKER", "1") diff --git a/resource
[spark] branch branch-3.1 updated: [SPARK-35542][ML] Fix: Bucketizer created for multiple columns with parameters splitsArray, inputCols and outputCols can not be loaded after saving it
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new a1a2534a01f [SPARK-35542][ML] Fix: Bucketizer created for multiple columns with parameters splitsArray, inputCols and outputCols can not be loaded after saving it a1a2534a01f is described below commit a1a2534a01fc776ad043129f3d33795c0f365347 Author: Weichen Xu AuthorDate: Fri Aug 19 12:26:34 2022 +0800 [SPARK-35542][ML] Fix: Bucketizer created for multiple columns with parameters splitsArray, inputCols and outputCols can not be loaded after saving it Signed-off-by: Weichen Xu ### What changes were proposed in this pull request? Fix: Bucketizer created for multiple columns with parameters splitsArray, inputCols and outputCols can not be loaded after saving it ### Why are the changes needed? Bugfix. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #37568 from WeichenXu123/SPARK-35542. Authored-by: Weichen Xu Signed-off-by: Weichen Xu (cherry picked from commit 876ce6a5df118095de51c3c4789d6db6da95eb23) Signed-off-by: Weichen Xu --- python/pyspark/ml/tests/test_persistence.py | 17 - python/pyspark/ml/wrapper.py| 6 +- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/python/pyspark/ml/tests/test_persistence.py b/python/pyspark/ml/tests/test_persistence.py index 77a6c030962..8680cb55b2f 100644 --- a/python/pyspark/ml/tests/test_persistence.py +++ b/python/pyspark/ml/tests/test_persistence.py @@ -25,7 +25,7 @@ from pyspark.ml.classification import DecisionTreeClassifier, FMClassifier, \ FMClassificationModel, LogisticRegression, MultilayerPerceptronClassifier, \ MultilayerPerceptronClassificationModel, OneVsRest, OneVsRestModel from pyspark.ml.clustering import KMeans -from pyspark.ml.feature import Binarizer, HashingTF, PCA +from pyspark.ml.feature import Binarizer, Bucketizer, HashingTF, PCA from pyspark.ml.linalg import Vectors from pyspark.ml.param import Params from pyspark.ml.pipeline import Pipeline, PipelineModel @@ -461,6 +461,21 @@ class PersistenceTest(SparkSessionTestCase): loadedMetadata = reader._parseMetaData(metadataStr, ) reader.getAndSetParams(lr, loadedMetadata) +# Test for SPARK-35542 fix. +def test_save_and_load_on_nested_list_params(self): +temp_path = tempfile.mkdtemp() +splitsArray = [ +[-float("inf"), 0.5, 1.4, float("inf")], +[-float("inf"), 0.1, 1.2, float("inf")], +] +bucketizer = Bucketizer( +splitsArray=splitsArray, inputCols=["values", "values"], outputCols=["b1", "b2"] +) +savePath = temp_path + "/bk" +bucketizer.write().overwrite().save(savePath) +loadedBucketizer = Bucketizer.load(savePath) +assert loadedBucketizer.getSplitsArray() == splitsArray + if __name__ == "__main__": from pyspark.ml.tests.test_persistence import * # noqa: F401 diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index 3c8dc03a829..d21d5b92add 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -185,7 +185,11 @@ class JavaParams(JavaWrapper, Params, metaclass=ABCMeta): java_param = self._java_obj.getParam(param.name) # SPARK-14931: Only check set params back to avoid default params mismatch. if self._java_obj.isSet(java_param): -value = _java2py(sc, self._java_obj.getOrDefault(java_param)) +java_value = self._java_obj.getOrDefault(java_param) +if param.typeConverter.__name__.startswith("toList"): +value = [_java2py(sc, x) for x in list(java_value)] +else: +value = _java2py(sc, java_value) self._set(**{param.name: value}) # SPARK-10931: Temporary fix for params that have a default in Java if self._java_obj.hasDefault(java_param) and not self.isDefined(param): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-35542][ML] Fix: Bucketizer created for multiple columns with parameters splitsArray, inputCols and outputCols can not be loaded after saving it
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 3943427b847 [SPARK-35542][ML] Fix: Bucketizer created for multiple columns with parameters splitsArray, inputCols and outputCols can not be loaded after saving it 3943427b847 is described below commit 3943427b8478d613c25f643553f8731318b0a445 Author: Weichen Xu AuthorDate: Fri Aug 19 12:26:34 2022 +0800 [SPARK-35542][ML] Fix: Bucketizer created for multiple columns with parameters splitsArray, inputCols and outputCols can not be loaded after saving it Signed-off-by: Weichen Xu ### What changes were proposed in this pull request? Fix: Bucketizer created for multiple columns with parameters splitsArray, inputCols and outputCols can not be loaded after saving it ### Why are the changes needed? Bugfix. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #37568 from WeichenXu123/SPARK-35542. Authored-by: Weichen Xu Signed-off-by: Weichen Xu (cherry picked from commit 876ce6a5df118095de51c3c4789d6db6da95eb23) Signed-off-by: Weichen Xu --- python/pyspark/ml/tests/test_persistence.py | 17 - python/pyspark/ml/wrapper.py| 6 +- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/python/pyspark/ml/tests/test_persistence.py b/python/pyspark/ml/tests/test_persistence.py index 77a6c030962..8680cb55b2f 100644 --- a/python/pyspark/ml/tests/test_persistence.py +++ b/python/pyspark/ml/tests/test_persistence.py @@ -25,7 +25,7 @@ from pyspark.ml.classification import DecisionTreeClassifier, FMClassifier, \ FMClassificationModel, LogisticRegression, MultilayerPerceptronClassifier, \ MultilayerPerceptronClassificationModel, OneVsRest, OneVsRestModel from pyspark.ml.clustering import KMeans -from pyspark.ml.feature import Binarizer, HashingTF, PCA +from pyspark.ml.feature import Binarizer, Bucketizer, HashingTF, PCA from pyspark.ml.linalg import Vectors from pyspark.ml.param import Params from pyspark.ml.pipeline import Pipeline, PipelineModel @@ -461,6 +461,21 @@ class PersistenceTest(SparkSessionTestCase): loadedMetadata = reader._parseMetaData(metadataStr, ) reader.getAndSetParams(lr, loadedMetadata) +# Test for SPARK-35542 fix. +def test_save_and_load_on_nested_list_params(self): +temp_path = tempfile.mkdtemp() +splitsArray = [ +[-float("inf"), 0.5, 1.4, float("inf")], +[-float("inf"), 0.1, 1.2, float("inf")], +] +bucketizer = Bucketizer( +splitsArray=splitsArray, inputCols=["values", "values"], outputCols=["b1", "b2"] +) +savePath = temp_path + "/bk" +bucketizer.write().overwrite().save(savePath) +loadedBucketizer = Bucketizer.load(savePath) +assert loadedBucketizer.getSplitsArray() == splitsArray + if __name__ == "__main__": from pyspark.ml.tests.test_persistence import * # noqa: F401 diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index 3c8dc03a829..d21d5b92add 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -185,7 +185,11 @@ class JavaParams(JavaWrapper, Params, metaclass=ABCMeta): java_param = self._java_obj.getParam(param.name) # SPARK-14931: Only check set params back to avoid default params mismatch. if self._java_obj.isSet(java_param): -value = _java2py(sc, self._java_obj.getOrDefault(java_param)) +java_value = self._java_obj.getOrDefault(java_param) +if param.typeConverter.__name__.startswith("toList"): +value = [_java2py(sc, x) for x in list(java_value)] +else: +value = _java2py(sc, java_value) self._set(**{param.name: value}) # SPARK-10931: Temporary fix for params that have a default in Java if self._java_obj.hasDefault(java_param) and not self.isDefined(param): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.3 updated: [SPARK-35542][ML] Fix: Bucketizer created for multiple columns with parameters splitsArray, inputCols and outputCols can not be loaded after saving it
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 87f957dea86 [SPARK-35542][ML] Fix: Bucketizer created for multiple columns with parameters splitsArray, inputCols and outputCols can not be loaded after saving it 87f957dea86 is described below commit 87f957dea86fe1b8c5979e499b5400866b235e43 Author: Weichen Xu AuthorDate: Fri Aug 19 12:26:34 2022 +0800 [SPARK-35542][ML] Fix: Bucketizer created for multiple columns with parameters splitsArray, inputCols and outputCols can not be loaded after saving it Signed-off-by: Weichen Xu ### What changes were proposed in this pull request? Fix: Bucketizer created for multiple columns with parameters splitsArray, inputCols and outputCols can not be loaded after saving it ### Why are the changes needed? Bugfix. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #37568 from WeichenXu123/SPARK-35542. Authored-by: Weichen Xu Signed-off-by: Weichen Xu (cherry picked from commit 876ce6a5df118095de51c3c4789d6db6da95eb23) Signed-off-by: Weichen Xu --- python/pyspark/ml/tests/test_persistence.py | 17 - python/pyspark/ml/wrapper.py| 6 +- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/python/pyspark/ml/tests/test_persistence.py b/python/pyspark/ml/tests/test_persistence.py index 4f09a49dd04..0b54540f06d 100644 --- a/python/pyspark/ml/tests/test_persistence.py +++ b/python/pyspark/ml/tests/test_persistence.py @@ -32,7 +32,7 @@ from pyspark.ml.classification import ( OneVsRestModel, ) from pyspark.ml.clustering import KMeans -from pyspark.ml.feature import Binarizer, HashingTF, PCA +from pyspark.ml.feature import Binarizer, Bucketizer, HashingTF, PCA from pyspark.ml.linalg import Vectors from pyspark.ml.param import Params from pyspark.ml.pipeline import Pipeline, PipelineModel @@ -518,6 +518,21 @@ class PersistenceTest(SparkSessionTestCase): ) reader.getAndSetParams(lr, loadedMetadata) +# Test for SPARK-35542 fix. +def test_save_and_load_on_nested_list_params(self): +temp_path = tempfile.mkdtemp() +splitsArray = [ +[-float("inf"), 0.5, 1.4, float("inf")], +[-float("inf"), 0.1, 1.2, float("inf")], +] +bucketizer = Bucketizer( +splitsArray=splitsArray, inputCols=["values", "values"], outputCols=["b1", "b2"] +) +savePath = temp_path + "/bk" +bucketizer.write().overwrite().save(savePath) +loadedBucketizer = Bucketizer.load(savePath) +assert loadedBucketizer.getSplitsArray() == splitsArray + if __name__ == "__main__": from pyspark.ml.tests.test_persistence import * # noqa: F401 diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index 7853e766244..32856540d6d 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -220,7 +220,11 @@ class JavaParams(JavaWrapper, Params, metaclass=ABCMeta): java_param = self._java_obj.getParam(param.name) # SPARK-14931: Only check set params back to avoid default params mismatch. if self._java_obj.isSet(java_param): -value = _java2py(sc, self._java_obj.getOrDefault(java_param)) +java_value = self._java_obj.getOrDefault(java_param) +if param.typeConverter.__name__.startswith("toList"): +value = [_java2py(sc, x) for x in list(java_value)] +else: +value = _java2py(sc, java_value) self._set(**{param.name: value}) # SPARK-10931: Temporary fix for params that have a default in Java if self._java_obj.hasDefault(java_param) and not self.isDefined(param): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-35542][ML] Fix: Bucketizer created for multiple columns with parameters splitsArray, inputCols and outputCols can not be loaded after saving it
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 876ce6a5df1 [SPARK-35542][ML] Fix: Bucketizer created for multiple columns with parameters splitsArray, inputCols and outputCols can not be loaded after saving it 876ce6a5df1 is described below commit 876ce6a5df118095de51c3c4789d6db6da95eb23 Author: Weichen Xu AuthorDate: Fri Aug 19 12:26:34 2022 +0800 [SPARK-35542][ML] Fix: Bucketizer created for multiple columns with parameters splitsArray, inputCols and outputCols can not be loaded after saving it Signed-off-by: Weichen Xu ### What changes were proposed in this pull request? Fix: Bucketizer created for multiple columns with parameters splitsArray, inputCols and outputCols can not be loaded after saving it ### Why are the changes needed? Bugfix. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #37568 from WeichenXu123/SPARK-35542. Authored-by: Weichen Xu Signed-off-by: Weichen Xu --- python/pyspark/ml/tests/test_persistence.py | 17 - python/pyspark/ml/wrapper.py| 6 +- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/python/pyspark/ml/tests/test_persistence.py b/python/pyspark/ml/tests/test_persistence.py index 4f09a49dd04..0b54540f06d 100644 --- a/python/pyspark/ml/tests/test_persistence.py +++ b/python/pyspark/ml/tests/test_persistence.py @@ -32,7 +32,7 @@ from pyspark.ml.classification import ( OneVsRestModel, ) from pyspark.ml.clustering import KMeans -from pyspark.ml.feature import Binarizer, HashingTF, PCA +from pyspark.ml.feature import Binarizer, Bucketizer, HashingTF, PCA from pyspark.ml.linalg import Vectors from pyspark.ml.param import Params from pyspark.ml.pipeline import Pipeline, PipelineModel @@ -518,6 +518,21 @@ class PersistenceTest(SparkSessionTestCase): ) reader.getAndSetParams(lr, loadedMetadata) +# Test for SPARK-35542 fix. +def test_save_and_load_on_nested_list_params(self): +temp_path = tempfile.mkdtemp() +splitsArray = [ +[-float("inf"), 0.5, 1.4, float("inf")], +[-float("inf"), 0.1, 1.2, float("inf")], +] +bucketizer = Bucketizer( +splitsArray=splitsArray, inputCols=["values", "values"], outputCols=["b1", "b2"] +) +savePath = temp_path + "/bk" +bucketizer.write().overwrite().save(savePath) +loadedBucketizer = Bucketizer.load(savePath) +assert loadedBucketizer.getSplitsArray() == splitsArray + if __name__ == "__main__": from pyspark.ml.tests.test_persistence import * # noqa: F401 diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index 39685ea631e..a83ed4c3d4b 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -220,7 +220,11 @@ class JavaParams(JavaWrapper, Params, metaclass=ABCMeta): java_param = self._java_obj.getParam(param.name) # SPARK-14931: Only check set params back to avoid default params mismatch. if self._java_obj.isSet(java_param): -value = _java2py(sc, self._java_obj.getOrDefault(java_param)) +java_value = self._java_obj.getOrDefault(java_param) +if param.typeConverter.__name__.startswith("toList"): +value = [_java2py(sc, x) for x in list(java_value)] +else: +value = _java2py(sc, java_value) self._set(**{param.name: value}) # SPARK-10931: Temporary fix for params that have a default in Java if self._java_obj.hasDefault(java_param) and not self.isDefined(param): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.1 updated: [SPARK-40079] Add Imputer inputCols validation for empty input case
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new f2453e8a129 [SPARK-40079] Add Imputer inputCols validation for empty input case f2453e8a129 is described below commit f2453e8a1293b367c2d7794e7f37978cc848aebc Author: Weichen Xu AuthorDate: Mon Aug 15 18:03:08 2022 +0800 [SPARK-40079] Add Imputer inputCols validation for empty input case Signed-off-by: Weichen Xu ### What changes were proposed in this pull request? Add Imputer inputCols validation for empty input case ### Why are the changes needed? If Imputer inputCols is empty, the `fit` works fine but when saving model, error will be raised: > AnalysisException: Datasource does not support writing empty or nested empty schemas. Please make sure the data schema has at least one or more column(s). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #37518 from WeichenXu123/imputer-param-validation. Authored-by: Weichen Xu Signed-off-by: Weichen Xu (cherry picked from commit 87094f89655b7df09cdecb47c653461ae855b0ac) Signed-off-by: Weichen Xu --- mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala | 1 + .../test/scala/org/apache/spark/ml/feature/ImputerSuite.scala | 10 ++ 2 files changed, 11 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala index 71403acc91b..5998887923f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala @@ -81,6 +81,7 @@ private[feature] trait ImputerParams extends Params with HasInputCol with HasInp protected def validateAndTransformSchema(schema: StructType): StructType = { ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), Seq(outputCols)) val (inputColNames, outputColNames) = getInOutCols() +require(inputColNames.length > 0, "inputCols cannot be empty") require(inputColNames.length == inputColNames.distinct.length, s"inputCols contains" + s" duplicates: (${inputColNames.mkString(", ")})") require(outputColNames.length == outputColNames.distinct.length, s"outputCols contains" + diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala index 30887f55638..5ef22a282c3 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala @@ -268,6 +268,16 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest { } assert(e.getMessage.contains("outputCols contains duplicates")) } + + withClue("Imputer should fail if inputCols param is empty.") { +val e: IllegalArgumentException = intercept[IllegalArgumentException] { + val imputer = new Imputer().setStrategy(strategy) +.setInputCols(Array[String]()) +.setOutputCols(Array[String]()) + val model = imputer.fit(df) +} +assert(e.getMessage.contains("inputCols cannot be empty")) + } } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-40079] Add Imputer inputCols validation for empty input case
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 2b54b48cd85 [SPARK-40079] Add Imputer inputCols validation for empty input case 2b54b48cd85 is described below commit 2b54b48cd852f93e8cf24397df6f3ec5b755233e Author: Weichen Xu AuthorDate: Mon Aug 15 18:03:08 2022 +0800 [SPARK-40079] Add Imputer inputCols validation for empty input case Signed-off-by: Weichen Xu ### What changes were proposed in this pull request? Add Imputer inputCols validation for empty input case ### Why are the changes needed? If Imputer inputCols is empty, the `fit` works fine but when saving model, error will be raised: > AnalysisException: Datasource does not support writing empty or nested empty schemas. Please make sure the data schema has at least one or more column(s). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #37518 from WeichenXu123/imputer-param-validation. Authored-by: Weichen Xu Signed-off-by: Weichen Xu (cherry picked from commit 87094f89655b7df09cdecb47c653461ae855b0ac) Signed-off-by: Weichen Xu --- mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala | 1 + .../test/scala/org/apache/spark/ml/feature/ImputerSuite.scala | 10 ++ 2 files changed, 11 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala index 71403acc91b..5998887923f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala @@ -81,6 +81,7 @@ private[feature] trait ImputerParams extends Params with HasInputCol with HasInp protected def validateAndTransformSchema(schema: StructType): StructType = { ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), Seq(outputCols)) val (inputColNames, outputColNames) = getInOutCols() +require(inputColNames.length > 0, "inputCols cannot be empty") require(inputColNames.length == inputColNames.distinct.length, s"inputCols contains" + s" duplicates: (${inputColNames.mkString(", ")})") require(outputColNames.length == outputColNames.distinct.length, s"outputCols contains" + diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala index 30887f55638..5ef22a282c3 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala @@ -268,6 +268,16 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest { } assert(e.getMessage.contains("outputCols contains duplicates")) } + + withClue("Imputer should fail if inputCols param is empty.") { +val e: IllegalArgumentException = intercept[IllegalArgumentException] { + val imputer = new Imputer().setStrategy(strategy) +.setInputCols(Array[String]()) +.setOutputCols(Array[String]()) + val model = imputer.fit(df) +} +assert(e.getMessage.contains("inputCols cannot be empty")) + } } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.3 updated: [SPARK-40079] Add Imputer inputCols validation for empty input case
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 2ee196dbb0b [SPARK-40079] Add Imputer inputCols validation for empty input case 2ee196dbb0b is described below commit 2ee196dbb0bf9ecfd96a1928cbaf15b7c3856d3d Author: Weichen Xu AuthorDate: Mon Aug 15 18:03:08 2022 +0800 [SPARK-40079] Add Imputer inputCols validation for empty input case Signed-off-by: Weichen Xu ### What changes were proposed in this pull request? Add Imputer inputCols validation for empty input case ### Why are the changes needed? If Imputer inputCols is empty, the `fit` works fine but when saving model, error will be raised: > AnalysisException: Datasource does not support writing empty or nested empty schemas. Please make sure the data schema has at least one or more column(s). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #37518 from WeichenXu123/imputer-param-validation. Authored-by: Weichen Xu Signed-off-by: Weichen Xu (cherry picked from commit 87094f89655b7df09cdecb47c653461ae855b0ac) Signed-off-by: Weichen Xu --- mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala | 1 + .../test/scala/org/apache/spark/ml/feature/ImputerSuite.scala | 10 ++ 2 files changed, 11 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala index 71403acc91b..5998887923f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala @@ -81,6 +81,7 @@ private[feature] trait ImputerParams extends Params with HasInputCol with HasInp protected def validateAndTransformSchema(schema: StructType): StructType = { ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), Seq(outputCols)) val (inputColNames, outputColNames) = getInOutCols() +require(inputColNames.length > 0, "inputCols cannot be empty") require(inputColNames.length == inputColNames.distinct.length, s"inputCols contains" + s" duplicates: (${inputColNames.mkString(", ")})") require(outputColNames.length == outputColNames.distinct.length, s"outputCols contains" + diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala index 30887f55638..5ef22a282c3 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala @@ -268,6 +268,16 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest { } assert(e.getMessage.contains("outputCols contains duplicates")) } + + withClue("Imputer should fail if inputCols param is empty.") { +val e: IllegalArgumentException = intercept[IllegalArgumentException] { + val imputer = new Imputer().setStrategy(strategy) +.setInputCols(Array[String]()) +.setOutputCols(Array[String]()) + val model = imputer.fit(df) +} +assert(e.getMessage.contains("inputCols cannot be empty")) + } } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40079] Add Imputer inputCols validation for empty input case
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 87094f89655 [SPARK-40079] Add Imputer inputCols validation for empty input case 87094f89655 is described below commit 87094f89655b7df09cdecb47c653461ae855b0ac Author: Weichen Xu AuthorDate: Mon Aug 15 18:03:08 2022 +0800 [SPARK-40079] Add Imputer inputCols validation for empty input case Signed-off-by: Weichen Xu ### What changes were proposed in this pull request? Add Imputer inputCols validation for empty input case ### Why are the changes needed? If Imputer inputCols is empty, the `fit` works fine but when saving model, error will be raised: > AnalysisException: Datasource does not support writing empty or nested empty schemas. Please make sure the data schema has at least one or more column(s). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #37518 from WeichenXu123/imputer-param-validation. Authored-by: Weichen Xu Signed-off-by: Weichen Xu --- mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala | 1 + .../test/scala/org/apache/spark/ml/feature/ImputerSuite.scala | 10 ++ 2 files changed, 11 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala index 71403acc91b..5998887923f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala @@ -81,6 +81,7 @@ private[feature] trait ImputerParams extends Params with HasInputCol with HasInp protected def validateAndTransformSchema(schema: StructType): StructType = { ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), Seq(outputCols)) val (inputColNames, outputColNames) = getInOutCols() +require(inputColNames.length > 0, "inputCols cannot be empty") require(inputColNames.length == inputColNames.distinct.length, s"inputCols contains" + s" duplicates: (${inputColNames.mkString(", ")})") require(outputColNames.length == outputColNames.distinct.length, s"outputCols contains" + diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala index 30887f55638..5ef22a282c3 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala @@ -268,6 +268,16 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest { } assert(e.getMessage.contains("outputCols contains duplicates")) } + + withClue("Imputer should fail if inputCols param is empty.") { +val e: IllegalArgumentException = intercept[IllegalArgumentException] { + val imputer = new Imputer().setStrategy(strategy) +.setInputCols(Array[String]()) +.setOutputCols(Array[String]()) + val model = imputer.fit(df) +} +assert(e.getMessage.contains("inputCols cannot be empty")) + } } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-39071][SQL][PYTHON] Add unwrap_udt function for unwrapping UserDefinedType columns
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new daedcd29630 [SPARK-39071][SQL][PYTHON] Add unwrap_udt function for unwrapping UserDefinedType columns daedcd29630 is described below commit daedcd29630de42f6d0858e1e693f4a3d9caf1aa Author: Weichen Xu AuthorDate: Tue May 10 19:29:05 2022 +0800 [SPARK-39071][SQL][PYTHON] Add unwrap_udt function for unwrapping UserDefinedType columns ### What changes were proposed in this pull request? Add unwrap_udt function for unwrapping UserDefinedType columns ### Why are the changes needed? This is useful in open-source project https://github.com/mengxr/pyspark-xgboost ### Does this PR introduce _any_ user-facing change? Yes. new sql function `unwrap_udt` added. ### How was this patch tested? Unit test. Closes #36408 from WeichenXu123/unwrapt_udt. Authored-by: Weichen Xu Signed-off-by: Weichen Xu --- python/pyspark/ml/tests/test_linalg.py | 14 +++ python/pyspark/sql/functions.py| 10 + .../spark/sql/catalyst/expressions/UnwrapUDT.scala | 49 ++ .../scala/org/apache/spark/sql/functions.scala | 9 .../apache/spark/sql/UserDefinedTypeSuite.scala| 10 + 5 files changed, 92 insertions(+) diff --git a/python/pyspark/ml/tests/test_linalg.py b/python/pyspark/ml/tests/test_linalg.py index dfdd32e98eb..a6e9f4e752e 100644 --- a/python/pyspark/ml/tests/test_linalg.py +++ b/python/pyspark/ml/tests/test_linalg.py @@ -33,6 +33,7 @@ from pyspark.ml.linalg import ( ) from pyspark.testing.mllibutils import MLlibTestCase from pyspark.sql import Row +from pyspark.sql.functions import unwrap_udt class VectorTests(MLlibTestCase): @@ -351,6 +352,19 @@ class VectorUDTTests(MLlibTestCase): else: raise TypeError("expecting a vector but got %r of type %r" % (v, type(v))) +def test_unwrap_udt(self): +df = self.spark.createDataFrame( +[(Vectors.dense(1.0, 2.0, 3.0),), (Vectors.sparse(3, {1: 1.0, 2: 5.5}),)], +["vec"], +) +results = df.select(unwrap_udt("vec").alias("v2")).collect() +unwrapped_vec = Row("type", "size", "indices", "values") +expected = [ +Row(v2=unwrapped_vec(1, None, None, [1.0, 2.0, 3.0])), +Row(v2=unwrapped_vec(0, 3, [1, 2], [1.0, 5.5])), +] +self.assertEquals(results, expected) + class MatrixUDTTests(MLlibTestCase): diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 019f64b5171..aa9aa5ed51b 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -5318,6 +5318,16 @@ def bucket(numBuckets: Union[Column, int], col: "ColumnOrName") -> Column: return _invoke_function("bucket", numBuckets, _to_java_column(col)) +def unwrap_udt(col: "ColumnOrName") -> Column: +""" +Unwrap UDT data type column into its underlying type. + +.. versionadded:: 3.4.0 + +""" +return _invoke_function("unwrap_udt", _to_java_column(col)) + + # User Defined Function -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnwrapUDT.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnwrapUDT.scala new file mode 100644 index 000..cb740672af3 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnwrapUDT.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spar
[spark] branch master updated: [SPARK-36642][SQL] Add df.withMetadata pyspark API
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ef7441b [SPARK-36642][SQL] Add df.withMetadata pyspark API ef7441b is described below commit ef7441bb4245e2fb08d70a8025a2c6d8bebfe75b Author: Liang Zhang AuthorDate: Fri Sep 24 08:30:18 2021 +0800 [SPARK-36642][SQL] Add df.withMetadata pyspark API This PR adds the pyspark API `df.withMetadata(columnName, metadata)`. The scala API is added in this PR https://github.com/apache/spark/pull/33853. ### What changes were proposed in this pull request? To make it easy to use/modify the semantic annotation, we want to have a shorter API to update the metadata in a dataframe. Currently we have `df.withColumn("col1", col("col1").alias("col1", metadata=metadata))` to update the metadata without changing the column name, and this is too verbose. We want to have a syntax suger API `df.withMetadata("col1", metadata=metadata)` to achieve the same functionality. ### Why are the changes needed? A bit of background for the frequency of the update: We are working on inferring the semantic data types and use them in AutoML and store the semantic annotation in the metadata. So in many cases, we will suggest the user update the metadata to correct the wrong inference or add the annotation for weak inference. ### Does this PR introduce _any_ user-facing change? Yes. A syntax suger API `df.withMetadata("col1", metadata=metadata)` to achieve the same functionality as`df.withColumn("col1", col("col1").alias("col1", metadata=metadata))`. ### How was this patch tested? doctest. Closes #34021 from liangz1/withMetadataPython. Authored-by: Liang Zhang Signed-off-by: Weichen Xu --- python/pyspark/sql/dataframe.py | 27 +++ 1 file changed, 27 insertions(+) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 081c06e..5a2e8cf 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -15,6 +15,7 @@ # limitations under the License. # +import json import sys import random import warnings @@ -23,6 +24,7 @@ from functools import reduce from html import escape as html_escape from pyspark import copy_func, since, _NoValue +from pyspark.context import SparkContext from pyspark.rdd import RDD, _load_from_socket, _local_iterator_from_socket from pyspark.serializers import BatchedSerializer, PickleSerializer, \ UTF8Deserializer @@ -2536,6 +2538,31 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): """ return DataFrame(self._jdf.withColumnRenamed(existing, new), self.sql_ctx) +def withMetadata(self, columnName, metadata): +"""Returns a new :class:`DataFrame` by updating an existing column with metadata. + +.. versionadded:: 3.3.0 + +Parameters +-- +columnName : str +string, name of the existing column to update the metadata. +metadata : dict +dict, new metadata to be assigned to df.schema[columnName].metadata + +Examples + +>>> df_meta = df.withMetadata('age', {'foo': 'bar'}) +>>> df_meta.schema['age'].metadata +{'foo': 'bar'} +""" +if not isinstance(metadata, dict): +raise TypeError("metadata should be a dict") +sc = SparkContext._active_spark_context +jmeta = sc._jvm.org.apache.spark.sql.types.Metadata.fromJson( +json.dumps(metadata)) +return DataFrame(self._jdf.withMetadata(columnName, jmeta), self.sql_ctx) + def drop(self, *cols): """Returns a new :class:`DataFrame` that drops the specified column. This is a no-op if schema doesn't contain the given column name(s). - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-36642][SQL] Add df.withMetadata: a syntax suger to update the metadata of a dataframe
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new cb30683 [SPARK-36642][SQL] Add df.withMetadata: a syntax suger to update the metadata of a dataframe cb30683 is described below commit cb30683b65714bd1c1ae15d31d1411aeb66cef12 Author: Liang Zhang AuthorDate: Wed Sep 8 09:35:18 2021 +0800 [SPARK-36642][SQL] Add df.withMetadata: a syntax suger to update the metadata of a dataframe ### What changes were proposed in this pull request? To make it easy to use/modify the semantic annotation, we want to have a shorter API to update the metadata in a dataframe. Currently we have `df.withColumn("col1", col("col1").alias("col1", metadata=metadata))` to update the metadata without changing the column name, and this is too verbose. We want to have a syntax suger API `df.withMetadata("col1", metadata=metadata)` to achieve the same functionality. ### Why are the changes needed? A bit of background for the frequency of the update: We are working on inferring the semantic data types and use them in AutoML and store the semantic annotation in the metadata. So in many cases, we will suggest the user update the metadata to correct the wrong inference or add the annotation for weak inference. ### Does this PR introduce _any_ user-facing change? Yes. A syntax suger API `df.withMetadata("col1", metadata=metadata)` to achieve the same functionality as`df.withColumn("col1", col("col1").alias("col1", metadata=metadata))`. ### How was this patch tested? A unit test in DataFrameSuite.scala. Closes #33853 from liangz1/withMetadata. Authored-by: Liang Zhang Signed-off-by: Weichen Xu --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 10 ++ .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 12 2 files changed, 22 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index bd3411d..1e85551 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2492,6 +2492,16 @@ class Dataset[T] private[sql]( } /** + * Returns a new Dataset by updating an existing column with metadata. + * + * @group untypedrel + * @since 3.3.0 + */ + def withMetadata(columnName: String, metadata: Metadata): DataFrame = { +withColumn(columnName, col(columnName), metadata) + } + + /** * Returns a new Dataset with a column dropped. This is a no-op if schema doesn't contain * column name. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index bf161a1..54fb90a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -702,6 +702,18 @@ class DataFrameSuite extends QueryTest "The size of column names: 2 isn't equal to the size of metadata elements: 1")) } + test("SPARK-36642: withMetadata: replace metadata of a column") { +val metadata = new MetadataBuilder().putLong("key", 1L).build() +val df1 = sparkContext.parallelize(Array(1, 2, 3)).toDF("x") +val df2 = df1.withMetadata("x", metadata) +assert(df2.schema(0).metadata === metadata) + +val err = intercept[AnalysisException] { + df1.withMetadata("x1", metadata) +} +assert(err.getMessage.contains("Cannot resolve column name")) + } + test("replace column using withColumn") { val df2 = sparkContext.parallelize(Array(1, 2, 3)).toDF("x") val df3 = df2.withColumn("x", df2("x") + 1) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.1 updated: [SPARK-35142][PYTHON][ML] Fix incorrect return type for `rawPredictionUDF` in `OneVsRestModel`
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 0208810 [SPARK-35142][PYTHON][ML] Fix incorrect return type for `rawPredictionUDF` in `OneVsRestModel` 0208810 is described below commit 0208810b93e234b822bc972f4236bf04bf521e7d Author: harupy <17039389+har...@users.noreply.github.com> AuthorDate: Wed Apr 21 16:29:10 2021 +0800 [SPARK-35142][PYTHON][ML] Fix incorrect return type for `rawPredictionUDF` in `OneVsRestModel` ### What changes were proposed in this pull request? Fixes incorrect return type for `rawPredictionUDF` in `OneVsRestModel`. ### Why are the changes needed? Bugfix ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test. Closes #32245 from harupy/SPARK-35142. Authored-by: harupy <17039389+har...@users.noreply.github.com> Signed-off-by: Weichen Xu (cherry picked from commit b6350f5bb00f99a060953850b069a419b70c329e) Signed-off-by: Weichen Xu --- python/pyspark/ml/classification.py| 4 ++-- python/pyspark/ml/tests/test_algorithms.py | 14 +- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 0553a61..17994ed 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -40,7 +40,7 @@ from pyspark.ml.util import DefaultParamsReader, DefaultParamsWriter, \ from pyspark.ml.wrapper import JavaParams, \ JavaPredictor, JavaPredictionModel, JavaWrapper from pyspark.ml.common import inherit_doc -from pyspark.ml.linalg import Vectors +from pyspark.ml.linalg import Vectors, VectorUDT from pyspark.sql import DataFrame from pyspark.sql.functions import udf, when from pyspark.sql.types import ArrayType, DoubleType @@ -3151,7 +3151,7 @@ class OneVsRestModel(Model, _OneVsRestParams, MLReadable, MLWritable): predArray.append(x) return Vectors.dense(predArray) -rawPredictionUDF = udf(func) +rawPredictionUDF = udf(func, VectorUDT()) aggregatedDataset = aggregatedDataset.withColumn( self.getRawPredictionCol(), rawPredictionUDF(aggregatedDataset[accColName])) diff --git a/python/pyspark/ml/tests/test_algorithms.py b/python/pyspark/ml/tests/test_algorithms.py index 5047521..35ce48b 100644 --- a/python/pyspark/ml/tests/test_algorithms.py +++ b/python/pyspark/ml/tests/test_algorithms.py @@ -25,7 +25,7 @@ from pyspark.ml.classification import FMClassifier, LogisticRegression, \ MultilayerPerceptronClassifier, OneVsRest from pyspark.ml.clustering import DistributedLDAModel, KMeans, LocalLDAModel, LDA, LDAModel from pyspark.ml.fpm import FPGrowth -from pyspark.ml.linalg import Matrices, Vectors +from pyspark.ml.linalg import Matrices, Vectors, DenseVector from pyspark.ml.recommendation import ALS from pyspark.ml.regression import GeneralizedLinearRegression, LinearRegression from pyspark.sql import Row @@ -116,6 +116,18 @@ class OneVsRestTests(SparkSessionTestCase): output = model.transform(df) self.assertEqual(output.columns, ["label", "features", "rawPrediction", "prediction"]) +def test_raw_prediction_column_is_of_vector_type(self): +# SPARK-35142: `OneVsRestModel` outputs raw prediction as a string column +df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), + (1.0, Vectors.sparse(2, [], [])), + (2.0, Vectors.dense(0.5, 0.5))], +["label", "features"]) +lr = LogisticRegression(maxIter=5, regParam=0.01) +ovr = OneVsRest(classifier=lr, parallelism=1) +model = ovr.fit(df) +row = model.transform(df).head() +self.assertIsInstance(row["rawPrediction"], DenseVector) + def test_parallelism_does_not_change_output(self): df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), (1.0, Vectors.sparse(2, [], [])), - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (43ad939 -> b6350f5)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 43ad939 [SPARK-35152][SQL] ANSI mode: IntegralDivide throws exception on overflow add b6350f5 [SPARK-35142][PYTHON][ML] Fix incorrect return type for `rawPredictionUDF` in `OneVsRestModel` No new revisions were added by this update. Summary of changes: python/pyspark/ml/classification.py| 4 ++-- python/pyspark/ml/tests/test_algorithms.py | 14 +- 2 files changed, 15 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.1 updated: [SPARK-31768][ML][FOLLOWUP] add getMetrics in Evaluators: cleanup
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 4cf94f3 [SPARK-31768][ML][FOLLOWUP] add getMetrics in Evaluators: cleanup 4cf94f3 is described below commit 4cf94f3e33de968b0048690cb0128437736d60a7 Author: Ruifeng Zheng AuthorDate: Tue Jan 26 11:57:28 2021 +0800 [SPARK-31768][ML][FOLLOWUP] add getMetrics in Evaluators: cleanup ### What changes were proposed in this pull request? 1, make `silhouette` a method; 2, change return type of `setDistanceMeasure` to `this.type`; ### Why are the changes needed? see comments in https://github.com/apache/spark/pull/28590 ### Does this PR introduce _any_ user-facing change? No, 3.1 has not been released ### How was this patch tested? existing testsuites Closes #31334 from zhengruifeng/31768-followup. Authored-by: Ruifeng Zheng Signed-off-by: Weichen Xu (cherry picked from commit cb37c962bec25083a67d65387ed88e7d4ee556ca) Signed-off-by: Weichen Xu --- .../scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala | 9 +++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala index 3dea244..3035688 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala @@ -37,13 +37,18 @@ class ClusteringMetrics private[spark](dataset: Dataset[_]) { def getDistanceMeasure: String = distanceMeasure - def setDistanceMeasure(value: String) : Unit = distanceMeasure = value + def setDistanceMeasure(value: String) : this.type = { +require(value.equalsIgnoreCase("squaredEuclidean") || + value.equalsIgnoreCase("cosine")) +distanceMeasure = value +this + } /** * Returns the silhouette score */ @Since("3.1.0") - lazy val silhouette: Double = { + def silhouette(): Double = { val columns = dataset.columns.toSeq if (distanceMeasure.equalsIgnoreCase("squaredEuclidean")) { SquaredEuclideanSilhouette.computeSilhouetteScore( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (d1177b5 -> cb37c96)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from d1177b5 [SPARK-34192][SQL] Move char padding to write side and remove length check on read side too add cb37c96 [SPARK-31768][ML][FOLLOWUP] add getMetrics in Evaluators: cleanup No new revisions were added by this update. Summary of changes: .../scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala | 9 +++-- 1 file changed, 7 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [MINOR][ML] Increase the timeout for StreamingLinearRegressionSuite to 60s
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 5a93bcb [MINOR][ML] Increase the timeout for StreamingLinearRegressionSuite to 60s 5a93bcb is described below commit 5a93bcbcc27d7d2f1e816e466478f67d4e3df105 Author: Liang Zhang AuthorDate: Wed Jan 20 08:26:52 2021 +0800 [MINOR][ML] Increase the timeout for StreamingLinearRegressionSuite to 60s ### What changes were proposed in this pull request? Increase the timeout for StreamingLinearRegressionSuite to 60s to deflake the test. ### Why are the changes needed? Reduce merge conflict. ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #31248 from liangz1/increase-timeout. Authored-by: Liang Zhang Signed-off-by: Weichen Xu (cherry picked from commit f7ff7ff0a5a251d7983d933dbe8c20882356e5bf) Signed-off-by: Weichen Xu --- .../apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala index 8e2d7d1..b8342f8 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala @@ -31,7 +31,7 @@ class StreamingLinearRegressionSuite with TestSuiteBase { // use longer wait time to ensure job completion - override def maxWaitTimeMillis: Int = 2 + override def maxWaitTimeMillis: Int = 6 // Assert that two values are equal within tolerance epsilon def assertEqual(v1: Double, v2: Double, epsilon: Double): Unit = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.1 updated: [MINOR][ML] Increase the timeout for StreamingLinearRegressionSuite to 60s
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 32e61b0 [MINOR][ML] Increase the timeout for StreamingLinearRegressionSuite to 60s 32e61b0 is described below commit 32e61b0a551efac0c1a471a9f8c307e2dec502db Author: Liang Zhang AuthorDate: Wed Jan 20 08:26:52 2021 +0800 [MINOR][ML] Increase the timeout for StreamingLinearRegressionSuite to 60s ### What changes were proposed in this pull request? Increase the timeout for StreamingLinearRegressionSuite to 60s to deflake the test. ### Why are the changes needed? Reduce merge conflict. ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #31248 from liangz1/increase-timeout. Authored-by: Liang Zhang Signed-off-by: Weichen Xu (cherry picked from commit f7ff7ff0a5a251d7983d933dbe8c20882356e5bf) Signed-off-by: Weichen Xu --- .../apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala index 8e2d7d1..b8342f8 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala @@ -31,7 +31,7 @@ class StreamingLinearRegressionSuite with TestSuiteBase { // use longer wait time to ensure job completion - override def maxWaitTimeMillis: Int = 2 + override def maxWaitTimeMillis: Int = 6 // Assert that two values are equal within tolerance epsilon def assertEqual(v1: Double, v2: Double, epsilon: Double): Unit = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (32dad1d -> f7ff7ff)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 32dad1d [SPARK-34149][SQL] Refresh cache in v2 `ALTER TABLE .. ADD PARTITION` add f7ff7ff [MINOR][ML] Increase the timeout for StreamingLinearRegressionSuite to 60s No new revisions were added by this update. Summary of changes: .../apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (66cc129 -> f354883)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 66cc129 [SPARK-34132][DOCS][R] Update Roxygen version references to 7.1.1 add f354883 [SPARK-34080][ML][PYTHON] Add UnivariateFeatureSelector No new revisions were added by this update. Summary of changes: docs/ml-features.md| 111 +--- docs/ml-statistics.md | 54 +- .../spark/examples/ml/JavaANOVATestExample.java| 75 --- .../examples/ml/JavaFValueSelectorExample.java | 81 --- .../spark/examples/ml/JavaFValueTestExample.java | 75 --- ...a => JavaUnivariateFeatureSelectorExample.java} | 21 +- examples/src/main/python/ml/anova_test_example.py | 50 -- .../src/main/python/ml/fvalue_selector_example.py | 53 -- examples/src/main/python/ml/fvalue_test_example.py | 50 -- ...e.py => univariate_feature_selector_example.py} | 16 +- .../spark/examples/ml/ANOVATestExample.scala | 63 -- .../spark/examples/ml/FValueSelectorExample.scala | 69 --- .../spark/examples/ml/FValueTestExample.scala | 63 -- ...cala => UnivariateFeatureSelectorExample.scala} | 20 +- .../apache/spark/ml/feature/ANOVASelector.scala| 195 -- .../apache/spark/ml/feature/ChiSqSelector.scala| 1 + .../apache/spark/ml/feature/FValueSelector.scala | 195 -- .../org/apache/spark/ml/feature/Selector.scala | 12 +- .../ml/feature/UnivariateFeatureSelector.scala | 467 ++ .../scala/org/apache/spark/ml/stat/ANOVATest.scala | 2 +- .../org/apache/spark/ml/stat/FValueTest.scala | 2 +- .../spark/ml/feature/ANOVASelectorSuite.scala | 206 --- .../spark/ml/feature/FValueSelectorSuite.scala | 238 --- .../feature/UnivariateFeatureSelectorSuite.scala | 685 + python/docs/source/reference/pyspark.ml.rst| 8 +- python/pyspark/ml/feature.py | 449 +++--- python/pyspark/ml/feature.pyi | 116 ++-- python/pyspark/ml/stat.py | 148 - python/pyspark/ml/stat.pyi | 12 - 29 files changed, 1512 insertions(+), 2025 deletions(-) delete mode 100644 examples/src/main/java/org/apache/spark/examples/ml/JavaANOVATestExample.java delete mode 100644 examples/src/main/java/org/apache/spark/examples/ml/JavaFValueSelectorExample.java delete mode 100644 examples/src/main/java/org/apache/spark/examples/ml/JavaFValueTestExample.java rename examples/src/main/java/org/apache/spark/examples/ml/{JavaANOVASelectorExample.java => JavaUnivariateFeatureSelectorExample.java} (79%) delete mode 100644 examples/src/main/python/ml/anova_test_example.py delete mode 100644 examples/src/main/python/ml/fvalue_selector_example.py delete mode 100644 examples/src/main/python/ml/fvalue_test_example.py rename examples/src/main/python/ml/{anova_selector_example.py => univariate_feature_selector_example.py} (70%) delete mode 100644 examples/src/main/scala/org/apache/spark/examples/ml/ANOVATestExample.scala delete mode 100644 examples/src/main/scala/org/apache/spark/examples/ml/FValueSelectorExample.scala delete mode 100644 examples/src/main/scala/org/apache/spark/examples/ml/FValueTestExample.scala rename examples/src/main/scala/org/apache/spark/examples/ml/{ANOVASelectorExample.scala => UnivariateFeatureSelectorExample.scala} (76%) delete mode 100644 mllib/src/main/scala/org/apache/spark/ml/feature/ANOVASelector.scala delete mode 100644 mllib/src/main/scala/org/apache/spark/ml/feature/FValueSelector.scala create mode 100644 mllib/src/main/scala/org/apache/spark/ml/feature/UnivariateFeatureSelector.scala delete mode 100644 mllib/src/test/scala/org/apache/spark/ml/feature/ANOVASelectorSuite.scala delete mode 100644 mllib/src/test/scala/org/apache/spark/ml/feature/FValueSelectorSuite.scala create mode 100644 mllib/src/test/scala/org/apache/spark/ml/feature/UnivariateFeatureSelectorSuite.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.1 updated: [MINOR][ML] Increase Bounded MLOR (without regularization) test error tolerance
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new b0a70ab [MINOR][ML] Increase Bounded MLOR (without regularization) test error tolerance b0a70ab is described below commit b0a70abed383cedd59f28cec75aa898df0c0b4bd Author: Weichen Xu AuthorDate: Wed Dec 9 11:18:09 2020 +0800 [MINOR][ML] Increase Bounded MLOR (without regularization) test error tolerance ### What changes were proposed in this pull request? Improve LogisticRegression test error tolerance ### Why are the changes needed? When we switch BLAS version, some of the tests will fail due to too strict error tolerance in test. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? N/A Closes #30587 from WeichenXu123/fix_lor_test. Authored-by: Weichen Xu Signed-off-by: Weichen Xu (cherry picked from commit f021f6d3c72e1c84637798b4ddcb7e208fdfbf46) Signed-off-by: Weichen Xu --- .../ml/classification/LogisticRegressionSuite.scala | 21 +++-- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index d0b282d..d2814b4 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -1548,9 +1548,9 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest { val interceptsExpected1 = Vectors.dense( 1.152482448372, 3.591773288423673, 5.079685953744937) -checkCoefficientsEquivalent(model1.coefficientMatrix, coefficientsExpected1) +checkBoundedMLORCoefficientsEquivalent(model1.coefficientMatrix, coefficientsExpected1) assert(model1.interceptVector ~== interceptsExpected1 relTol 0.01) -checkCoefficientsEquivalent(model2.coefficientMatrix, coefficientsExpected1) +checkBoundedMLORCoefficientsEquivalent(model2.coefficientMatrix, coefficientsExpected1) assert(model2.interceptVector ~== interceptsExpected1 relTol 0.01) // Bound constrained optimization with bound on both side. @@ -1585,9 +1585,9 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest { isTransposed = true) val interceptsExpected3 = Vectors.dense(1.0, 2.0, 2.0) -checkCoefficientsEquivalent(model3.coefficientMatrix, coefficientsExpected3) +checkBoundedMLORCoefficientsEquivalent(model3.coefficientMatrix, coefficientsExpected3) assert(model3.interceptVector ~== interceptsExpected3 relTol 0.01) -checkCoefficientsEquivalent(model4.coefficientMatrix, coefficientsExpected3) +checkBoundedMLORCoefficientsEquivalent(model4.coefficientMatrix, coefficientsExpected3) assert(model4.interceptVector ~== interceptsExpected3 relTol 0.01) // Bound constrained optimization with infinite bound on both side. @@ -1621,9 +1621,9 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest { val interceptsExpected5 = Vectors.dense( -2.2231282183460723, 0.3669496747012527, 1.856178543644802) -checkCoefficientsEquivalent(model5.coefficientMatrix, coefficientsExpected5) +checkBoundedMLORCoefficientsEquivalent(model5.coefficientMatrix, coefficientsExpected5) assert(model5.interceptVector ~== interceptsExpected5 relTol 0.01) -checkCoefficientsEquivalent(model6.coefficientMatrix, coefficientsExpected5) +checkBoundedMLORCoefficientsEquivalent(model6.coefficientMatrix, coefficientsExpected5) assert(model6.interceptVector ~== interceptsExpected5 relTol 0.01) } @@ -1719,9 +1719,9 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest { 1.7524631428961193, 1.2292565990448736, 1.3433784431904323, 1.5846063017678864), isTransposed = true) -checkCoefficientsEquivalent(model1.coefficientMatrix, coefficientsExpected) +checkBoundedMLORCoefficientsEquivalent(model1.coefficientMatrix, coefficientsExpected) assert(model1.interceptVector.toArray === Array.fill(3)(0.0)) -checkCoefficientsEquivalent(model2.coefficientMatrix, coefficientsExpected) +checkBoundedMLORCoefficientsEquivalent(model2.coefficientMatrix, coefficientsExpected) assert(model2.interceptVector.toArray === Array.fill(3)(0.0)) } @@ -2953,16 +2953,17 @@ object LogisticRegressionSuite { } /** + * Note: This method is only used in Bounded MLOR (without regularization) test * When no regularization is applied, the multinomial coefficients lack identifiability * because we do not use a pivot class. We can add any constant value
[spark] branch master updated (3ac70f1 -> f021f6d)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 3ac70f1 [SPARK-33695][BUILD] Upgrade to jackson to 2.10.5 and jackson-databind to 2.10.5.1 add f021f6d [MINOR][ML] Increase Bounded MLOR (without regularization) test error tolerance No new revisions were added by this update. Summary of changes: .../ml/classification/LogisticRegressionSuite.scala | 21 +++-- 1 file changed, 11 insertions(+), 10 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-33592][ML][PYTHON][3.0] Backport Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 8acbe5b [SPARK-33592][ML][PYTHON][3.0] Backport Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading 8acbe5b is described below commit 8acbe5b822b7b0ad49079aa223ad52afe70b5afa Author: Weichen Xu AuthorDate: Mon Dec 7 11:42:18 2020 +0800 [SPARK-33592][ML][PYTHON][3.0] Backport Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading When saving validator estimatorParamMaps, will check all nested stages in tuned estimator to get correct param parent. Two typical cases to manually test: ~~~python tokenizer = Tokenizer(inputCol="text", outputCol="words") hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") lr = LogisticRegression() pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) paramGrid = ParamGridBuilder() \ .addGrid(hashingTF.numFeatures, [10, 100]) \ .addGrid(lr.maxIter, [100, 200]) \ .build() tvs = TrainValidationSplit(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=MulticlassClassificationEvaluator()) tvs.save(tvsPath) loadedTvs = TrainValidationSplit.load(tvsPath) ~~~ ~~~python lr = LogisticRegression() ova = OneVsRest(classifier=lr) grid = ParamGridBuilder().addGrid(lr.maxIter, [100, 200]).build() evaluator = MulticlassClassificationEvaluator() tvs = TrainValidationSplit(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator) tvs.save(tvsPath) loadedTvs = TrainValidationSplit.load(tvsPath) ~~~ Bug fix. No Unit test. Closes #30539 from WeichenXu123/fix_tuning_param_maps_io. Authored-by: Weichen Xu Signed-off-by: Ruifeng Zheng (cherry picked from commit 80161238fe9393aabd5fcd56752ff1e43f6989b1) Signed-off-by: Weichen Xu ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #30590 from WeichenXu123/SPARK-33592-bp-3.0. Authored-by: Weichen Xu Signed-off-by: Weichen Xu --- dev/sparktestsupport/modules.py| 1 + python/pyspark/ml/classification.py| 48 +- python/pyspark/ml/param/__init__.py| 6 +++ python/pyspark/ml/pipeline.py | 53 +--- python/pyspark/ml/tests/test_tuning.py | 47 +++-- python/pyspark/ml/tests/test_util.py | 84 +++ python/pyspark/ml/tuning.py| 92 +++--- python/pyspark/ml/util.py | 38 ++ 8 files changed, 261 insertions(+), 108 deletions(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 75bdec0..8705d52 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -548,6 +548,7 @@ pyspark_ml = Module( "pyspark.ml.tests.test_stat", "pyspark.ml.tests.test_training_summary", "pyspark.ml.tests.test_tuning", +"pyspark.ml.tests.test_util", "pyspark.ml.tests.test_wrapper", ], blacklisted_python_implementations=[ diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index c8e15ca..1392bc7 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -27,9 +27,9 @@ from pyspark.ml.tree import _DecisionTreeModel, _DecisionTreeParams, \ _HasVarianceImpurity, _TreeClassifierParams, _TreeEnsembleParams from pyspark.ml.regression import _FactorizationMachinesParams, DecisionTreeRegressionModel from pyspark.ml.util import * -from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams, \ +from pyspark.ml.wrapper import JavaParams, \ JavaPredictor, _JavaPredictorParams, JavaPredictionModel, JavaWrapper -from pyspark.ml.common import inherit_doc, _java2py, _py2java +from pyspark.ml.common import inherit_doc from pyspark.ml.linalg import Vectors from pyspark.sql import DataFrame from pyspark.sql.functions import udf, when @@ -2635,50 +2635,6 @@ class OneVsRest(Estimator, _OneVsRestParams, HasParallelism, JavaMLReadable, Jav _java_obj.setRawPredictionCol(self.getRawPredictionCol()) return _java_obj -def _make_java_param_pair(self, param, value): -""" -Makes
[spark] branch master updated (63f9d47 -> 7e759b2)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 63f9d47 [SPARK-33634][SQL][TESTS] Use Analyzer in PlanResolutionSuite add 7e759b2 [SPARK-33520][ML][PYSPARK] make CrossValidator/TrainValidateSplit/OneVsRest Reader/Writer support Python backend estimator/evaluator No new revisions were added by this update. Summary of changes: python/pyspark/ml/classification.py | 128 +- python/pyspark/ml/classification.pyi| 31 ++- python/pyspark/ml/tests/test_persistence.py | 14 +- python/pyspark/ml/tests/test_tuning.py | 97 +--- python/pyspark/ml/tuning.py | 357 ++-- python/pyspark/ml/tuning.pyi| 40 python/pyspark/ml/util.py | 42 +++- python/pyspark/ml/util.pyi | 2 + python/pyspark/testing/mlutils.py | 87 +++ 9 files changed, 739 insertions(+), 59 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (a180e02 -> 689c294)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from a180e02 [SPARK-32852][SQL][DOC][FOLLOWUP] Revise the documentation of spark.sql.hive.metastore.jars add 689c294 [SPARK-32907][ML][PYTHON] Adaptively blockify instances - AFT,LiR,LoR No new revisions were added by this update. Summary of changes: .../apache/spark/ml/classification/LinearSVC.scala | 6 +- .../ml/classification/LogisticRegression.scala | 114 + .../spark/ml/optim/aggregator/AFTAggregator.scala | 42 +++ .../ml/optim/aggregator/HingeAggregator.scala | 6 +- .../ml/optim/aggregator/HuberAggregator.scala | 11 +- .../optim/aggregator/LeastSquaresAggregator.scala | 11 +- .../ml/optim/aggregator/LogisticAggregator.scala | 19 +-- .../ml/param/shared/SharedParamsCodeGen.scala | 4 +- .../spark/ml/param/shared/sharedParams.scala | 4 +- .../ml/regression/AFTSurvivalRegression.scala | 130 +++ .../spark/ml/regression/LinearRegression.scala | 138 ++--- .../mllib/classification/LogisticRegression.scala | 4 +- .../classification/LogisticRegressionSuite.scala | 8 +- .../ml/regression/AFTSurvivalRegressionSuite.scala | 4 +- .../ml/regression/LinearRegressionSuite.scala | 4 +- python/pyspark/ml/classification.py| 22 ++-- python/pyspark/ml/classification.pyi | 8 +- python/pyspark/ml/param/_shared_params_code_gen.py | 4 +- python/pyspark/ml/param/shared.py | 4 +- python/pyspark/ml/regression.py| 46 +++ python/pyspark/ml/regression.pyi | 18 +-- 21 files changed, 237 insertions(+), 370 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (a180e02 -> 689c294)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from a180e02 [SPARK-32852][SQL][DOC][FOLLOWUP] Revise the documentation of spark.sql.hive.metastore.jars add 689c294 [SPARK-32907][ML][PYTHON] Adaptively blockify instances - AFT,LiR,LoR No new revisions were added by this update. Summary of changes: .../apache/spark/ml/classification/LinearSVC.scala | 6 +- .../ml/classification/LogisticRegression.scala | 114 + .../spark/ml/optim/aggregator/AFTAggregator.scala | 42 +++ .../ml/optim/aggregator/HingeAggregator.scala | 6 +- .../ml/optim/aggregator/HuberAggregator.scala | 11 +- .../optim/aggregator/LeastSquaresAggregator.scala | 11 +- .../ml/optim/aggregator/LogisticAggregator.scala | 19 +-- .../ml/param/shared/SharedParamsCodeGen.scala | 4 +- .../spark/ml/param/shared/sharedParams.scala | 4 +- .../ml/regression/AFTSurvivalRegression.scala | 130 +++ .../spark/ml/regression/LinearRegression.scala | 138 ++--- .../mllib/classification/LogisticRegression.scala | 4 +- .../classification/LogisticRegressionSuite.scala | 8 +- .../ml/regression/AFTSurvivalRegressionSuite.scala | 4 +- .../ml/regression/LinearRegressionSuite.scala | 4 +- python/pyspark/ml/classification.py| 22 ++-- python/pyspark/ml/classification.pyi | 8 +- python/pyspark/ml/param/_shared_params_code_gen.py | 4 +- python/pyspark/ml/param/shared.py | 4 +- python/pyspark/ml/regression.py| 46 +++ python/pyspark/ml/regression.pyi | 18 +-- 21 files changed, 237 insertions(+), 370 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (a180e02 -> 689c294)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from a180e02 [SPARK-32852][SQL][DOC][FOLLOWUP] Revise the documentation of spark.sql.hive.metastore.jars add 689c294 [SPARK-32907][ML][PYTHON] Adaptively blockify instances - AFT,LiR,LoR No new revisions were added by this update. Summary of changes: .../apache/spark/ml/classification/LinearSVC.scala | 6 +- .../ml/classification/LogisticRegression.scala | 114 + .../spark/ml/optim/aggregator/AFTAggregator.scala | 42 +++ .../ml/optim/aggregator/HingeAggregator.scala | 6 +- .../ml/optim/aggregator/HuberAggregator.scala | 11 +- .../optim/aggregator/LeastSquaresAggregator.scala | 11 +- .../ml/optim/aggregator/LogisticAggregator.scala | 19 +-- .../ml/param/shared/SharedParamsCodeGen.scala | 4 +- .../spark/ml/param/shared/sharedParams.scala | 4 +- .../ml/regression/AFTSurvivalRegression.scala | 130 +++ .../spark/ml/regression/LinearRegression.scala | 138 ++--- .../mllib/classification/LogisticRegression.scala | 4 +- .../classification/LogisticRegressionSuite.scala | 8 +- .../ml/regression/AFTSurvivalRegressionSuite.scala | 4 +- .../ml/regression/LinearRegressionSuite.scala | 4 +- python/pyspark/ml/classification.py| 22 ++-- python/pyspark/ml/classification.pyi | 8 +- python/pyspark/ml/param/_shared_params_code_gen.py | 4 +- python/pyspark/ml/param/shared.py | 4 +- python/pyspark/ml/regression.py| 46 +++ python/pyspark/ml/regression.pyi | 18 +-- 21 files changed, 237 insertions(+), 370 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (a180e02 -> 689c294)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from a180e02 [SPARK-32852][SQL][DOC][FOLLOWUP] Revise the documentation of spark.sql.hive.metastore.jars add 689c294 [SPARK-32907][ML][PYTHON] Adaptively blockify instances - AFT,LiR,LoR No new revisions were added by this update. Summary of changes: .../apache/spark/ml/classification/LinearSVC.scala | 6 +- .../ml/classification/LogisticRegression.scala | 114 + .../spark/ml/optim/aggregator/AFTAggregator.scala | 42 +++ .../ml/optim/aggregator/HingeAggregator.scala | 6 +- .../ml/optim/aggregator/HuberAggregator.scala | 11 +- .../optim/aggregator/LeastSquaresAggregator.scala | 11 +- .../ml/optim/aggregator/LogisticAggregator.scala | 19 +-- .../ml/param/shared/SharedParamsCodeGen.scala | 4 +- .../spark/ml/param/shared/sharedParams.scala | 4 +- .../ml/regression/AFTSurvivalRegression.scala | 130 +++ .../spark/ml/regression/LinearRegression.scala | 138 ++--- .../mllib/classification/LogisticRegression.scala | 4 +- .../classification/LogisticRegressionSuite.scala | 8 +- .../ml/regression/AFTSurvivalRegressionSuite.scala | 4 +- .../ml/regression/LinearRegressionSuite.scala | 4 +- python/pyspark/ml/classification.py| 22 ++-- python/pyspark/ml/classification.pyi | 8 +- python/pyspark/ml/param/_shared_params_code_gen.py | 4 +- python/pyspark/ml/param/shared.py | 4 +- python/pyspark/ml/regression.py| 46 +++ python/pyspark/ml/regression.pyi | 18 +-- 21 files changed, 237 insertions(+), 370 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (a180e02 -> 689c294)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from a180e02 [SPARK-32852][SQL][DOC][FOLLOWUP] Revise the documentation of spark.sql.hive.metastore.jars add 689c294 [SPARK-32907][ML][PYTHON] Adaptively blockify instances - AFT,LiR,LoR No new revisions were added by this update. Summary of changes: .../apache/spark/ml/classification/LinearSVC.scala | 6 +- .../ml/classification/LogisticRegression.scala | 114 + .../spark/ml/optim/aggregator/AFTAggregator.scala | 42 +++ .../ml/optim/aggregator/HingeAggregator.scala | 6 +- .../ml/optim/aggregator/HuberAggregator.scala | 11 +- .../optim/aggregator/LeastSquaresAggregator.scala | 11 +- .../ml/optim/aggregator/LogisticAggregator.scala | 19 +-- .../ml/param/shared/SharedParamsCodeGen.scala | 4 +- .../spark/ml/param/shared/sharedParams.scala | 4 +- .../ml/regression/AFTSurvivalRegression.scala | 130 +++ .../spark/ml/regression/LinearRegression.scala | 138 ++--- .../mllib/classification/LogisticRegression.scala | 4 +- .../classification/LogisticRegressionSuite.scala | 8 +- .../ml/regression/AFTSurvivalRegressionSuite.scala | 4 +- .../ml/regression/LinearRegressionSuite.scala | 4 +- python/pyspark/ml/classification.py| 22 ++-- python/pyspark/ml/classification.pyi | 8 +- python/pyspark/ml/param/_shared_params_code_gen.py | 4 +- python/pyspark/ml/param/shared.py | 4 +- python/pyspark/ml/regression.py| 46 +++ python/pyspark/ml/regression.pyi | 18 +-- 21 files changed, 237 insertions(+), 370 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (4335af0 -> a288716)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4335af0 [MINOR][DOC] spark.executor.memoryOverhead is not cluster-mode only add a288716 [SPARK-32907][ML][PYTHON] adaptively blockify instances - LinearSVC No new revisions were added by this update. Summary of changes: .../apache/spark/ml/classification/LinearSVC.scala | 93 +++--- .../org/apache/spark/ml/feature/Instance.scala | 71 + .../ml/param/shared/SharedParamsCodeGen.scala | 7 +- .../spark/ml/param/shared/sharedParams.scala | 18 + .../spark/ml/classification/LinearSVCSuite.scala | 4 +- .../apache/spark/ml/feature/InstanceSuite.scala| 54 + python/pyspark/ml/classification.py| 26 +++--- python/pyspark/ml/classification.pyi | 9 ++- python/pyspark/ml/param/_shared_params_code_gen.py | 6 +- python/pyspark/ml/param/shared.py | 18 + python/pyspark/ml/param/shared.pyi | 5 ++ 11 files changed, 224 insertions(+), 87 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (4335af0 -> a288716)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4335af0 [MINOR][DOC] spark.executor.memoryOverhead is not cluster-mode only add a288716 [SPARK-32907][ML][PYTHON] adaptively blockify instances - LinearSVC No new revisions were added by this update. Summary of changes: .../apache/spark/ml/classification/LinearSVC.scala | 93 +++--- .../org/apache/spark/ml/feature/Instance.scala | 71 + .../ml/param/shared/SharedParamsCodeGen.scala | 7 +- .../spark/ml/param/shared/sharedParams.scala | 18 + .../spark/ml/classification/LinearSVCSuite.scala | 4 +- .../apache/spark/ml/feature/InstanceSuite.scala| 54 + python/pyspark/ml/classification.py| 26 +++--- python/pyspark/ml/classification.pyi | 9 ++- python/pyspark/ml/param/_shared_params_code_gen.py | 6 +- python/pyspark/ml/param/shared.py | 18 + python/pyspark/ml/param/shared.pyi | 5 ++ 11 files changed, 224 insertions(+), 87 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (4335af0 -> a288716)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4335af0 [MINOR][DOC] spark.executor.memoryOverhead is not cluster-mode only add a288716 [SPARK-32907][ML][PYTHON] adaptively blockify instances - LinearSVC No new revisions were added by this update. Summary of changes: .../apache/spark/ml/classification/LinearSVC.scala | 93 +++--- .../org/apache/spark/ml/feature/Instance.scala | 71 + .../ml/param/shared/SharedParamsCodeGen.scala | 7 +- .../spark/ml/param/shared/sharedParams.scala | 18 + .../spark/ml/classification/LinearSVCSuite.scala | 4 +- .../apache/spark/ml/feature/InstanceSuite.scala| 54 + python/pyspark/ml/classification.py| 26 +++--- python/pyspark/ml/classification.pyi | 9 ++- python/pyspark/ml/param/_shared_params_code_gen.py | 6 +- python/pyspark/ml/param/shared.py | 18 + python/pyspark/ml/param/shared.pyi | 5 ++ 11 files changed, 224 insertions(+), 87 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (4335af0 -> a288716)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4335af0 [MINOR][DOC] spark.executor.memoryOverhead is not cluster-mode only add a288716 [SPARK-32907][ML][PYTHON] adaptively blockify instances - LinearSVC No new revisions were added by this update. Summary of changes: .../apache/spark/ml/classification/LinearSVC.scala | 93 +++--- .../org/apache/spark/ml/feature/Instance.scala | 71 + .../ml/param/shared/SharedParamsCodeGen.scala | 7 +- .../spark/ml/param/shared/sharedParams.scala | 18 + .../spark/ml/classification/LinearSVCSuite.scala | 4 +- .../apache/spark/ml/feature/InstanceSuite.scala| 54 + python/pyspark/ml/classification.py| 26 +++--- python/pyspark/ml/classification.pyi | 9 ++- python/pyspark/ml/param/_shared_params_code_gen.py | 6 +- python/pyspark/ml/param/shared.py | 18 + python/pyspark/ml/param/shared.pyi | 5 ++ 11 files changed, 224 insertions(+), 87 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (4335af0 -> a288716)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4335af0 [MINOR][DOC] spark.executor.memoryOverhead is not cluster-mode only add a288716 [SPARK-32907][ML][PYTHON] adaptively blockify instances - LinearSVC No new revisions were added by this update. Summary of changes: .../apache/spark/ml/classification/LinearSVC.scala | 93 +++--- .../org/apache/spark/ml/feature/Instance.scala | 71 + .../ml/param/shared/SharedParamsCodeGen.scala | 7 +- .../spark/ml/param/shared/sharedParams.scala | 18 + .../spark/ml/classification/LinearSVCSuite.scala | 4 +- .../apache/spark/ml/feature/InstanceSuite.scala| 54 + python/pyspark/ml/classification.py| 26 +++--- python/pyspark/ml/classification.pyi | 9 ++- python/pyspark/ml/param/_shared_params_code_gen.py | 6 +- python/pyspark/ml/param/shared.py | 18 + python/pyspark/ml/param/shared.pyi | 5 ++ 11 files changed, 224 insertions(+), 87 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] 01/02: init
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch fix_pipeline_tuning in repository https://gitbox.apache.org/repos/asf/spark.git commit c834fe8f335dc74db6346d82b5ce4cf742cba9bb Author: Weichen Xu AuthorDate: Mon Apr 20 17:04:12 2020 +0800 init --- python/pyspark/ml/pipeline.py | 46 +-- 1 file changed, 44 insertions(+), 2 deletions(-) diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py index 09e0748..0004b64 100644 --- a/python/pyspark/ml/pipeline.py +++ b/python/pyspark/ml/pipeline.py @@ -25,8 +25,8 @@ from pyspark import since, keyword_only, SparkContext from pyspark.ml.base import Estimator, Model, Transformer from pyspark.ml.param import Param, Params from pyspark.ml.util import * -from pyspark.ml.wrapper import JavaParams -from pyspark.ml.common import inherit_doc +from pyspark.ml.wrapper import JavaParams, JavaWrapper +from pyspark.ml.common import inherit_doc, _java2py, _py2java @inherit_doc @@ -174,6 +174,48 @@ class Pipeline(Estimator, MLReadable, MLWritable): return _java_obj +def _make_java_param_pair(self, param, value): +""" +Makes a Java param pair. +""" +sc = SparkContext._active_spark_context +param = self._resolveParam(param) +java_param = sc._jvm.org.apache.spark.ml.param.Param(param.parent, param.name, param.doc) +if isinstance(value, JavaParams): +# used in the case of an estimator having another estimator as a parameter +# the reason why this is not in _py2java in common.py is that importing +# Estimator and Model in common.py results in a circular import with inherit_doc +java_value = value._to_java() +else: +java_value = _py2java(sc, value) +return java_param.w(java_value) + +def _transfer_param_map_to_java(self, pyParamMap): +""" +Transforms a Python ParamMap into a Java ParamMap. +""" +paramMap = JavaWrapper._new_java_obj("org.apache.spark.ml.param.ParamMap") +for param in self.params: +if param in pyParamMap: +pair = self._make_java_param_pair(param, pyParamMap[param]) +paramMap.put([pair]) +return paramMap + +def _transfer_param_map_from_java(self, javaParamMap): +""" +Transforms a Java ParamMap into a Python ParamMap. +""" +sc = SparkContext._active_spark_context +paramMap = dict() +for pair in javaParamMap.toList(): +param = pair.param() +if self.hasParam(str(param.name())): +if param.name() == "classifier": +paramMap[self.getParam(param.name())] = JavaParams._from_java(pair.value()) +else: +paramMap[self.getParam(param.name())] = _java2py(sc, pair.value()) +return paramMap + @inherit_doc class PipelineWriter(MLWriter): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch fix_pipeline_tuning created (now 2af493a)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch fix_pipeline_tuning in repository https://gitbox.apache.org/repos/asf/spark.git. at 2af493a init This branch includes the following new commits: new c834fe8 init new 2af493a init The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] 02/02: init
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch fix_pipeline_tuning in repository https://gitbox.apache.org/repos/asf/spark.git commit 2af493a016093d8499bf02bc37bc3e842ed74c62 Author: Weichen Xu AuthorDate: Mon Apr 20 17:47:35 2020 +0800 init --- python/pyspark/ml/pipeline.py | 8 +--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py index 0004b64..3a71632 100644 --- a/python/pyspark/ml/pipeline.py +++ b/python/pyspark/ml/pipeline.py @@ -210,10 +210,12 @@ class Pipeline(Estimator, MLReadable, MLWritable): for pair in javaParamMap.toList(): param = pair.param() if self.hasParam(str(param.name())): -if param.name() == "classifier": -paramMap[self.getParam(param.name())] = JavaParams._from_java(pair.value()) +java_obj = pair.value() +if sc._jvm.Class.forName("org.apache.spark.ml.PipelineStage").isInstance(java_obj): +py_obj = JavaParams._from_java(java_obj) else: -paramMap[self.getParam(param.name())] = _java2py(sc, pair.value()) +py_obj = _java2py(sc, java_obj) +paramMap[self.getParam(param.name())] = py_obj return paramMap - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] 02/02: init
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch fix_pipeline_tuning in repository https://gitbox.apache.org/repos/asf/spark.git commit 2af493a016093d8499bf02bc37bc3e842ed74c62 Author: Weichen Xu AuthorDate: Mon Apr 20 17:47:35 2020 +0800 init --- python/pyspark/ml/pipeline.py | 8 +--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py index 0004b64..3a71632 100644 --- a/python/pyspark/ml/pipeline.py +++ b/python/pyspark/ml/pipeline.py @@ -210,10 +210,12 @@ class Pipeline(Estimator, MLReadable, MLWritable): for pair in javaParamMap.toList(): param = pair.param() if self.hasParam(str(param.name())): -if param.name() == "classifier": -paramMap[self.getParam(param.name())] = JavaParams._from_java(pair.value()) +java_obj = pair.value() +if sc._jvm.Class.forName("org.apache.spark.ml.PipelineStage").isInstance(java_obj): +py_obj = JavaParams._from_java(java_obj) else: -paramMap[self.getParam(param.name())] = _java2py(sc, pair.value()) +py_obj = _java2py(sc, java_obj) +paramMap[self.getParam(param.name())] = py_obj return paramMap - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] 01/02: init
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch fix_pipeline_tuning in repository https://gitbox.apache.org/repos/asf/spark.git commit c834fe8f335dc74db6346d82b5ce4cf742cba9bb Author: Weichen Xu AuthorDate: Mon Apr 20 17:04:12 2020 +0800 init --- python/pyspark/ml/pipeline.py | 46 +-- 1 file changed, 44 insertions(+), 2 deletions(-) diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py index 09e0748..0004b64 100644 --- a/python/pyspark/ml/pipeline.py +++ b/python/pyspark/ml/pipeline.py @@ -25,8 +25,8 @@ from pyspark import since, keyword_only, SparkContext from pyspark.ml.base import Estimator, Model, Transformer from pyspark.ml.param import Param, Params from pyspark.ml.util import * -from pyspark.ml.wrapper import JavaParams -from pyspark.ml.common import inherit_doc +from pyspark.ml.wrapper import JavaParams, JavaWrapper +from pyspark.ml.common import inherit_doc, _java2py, _py2java @inherit_doc @@ -174,6 +174,48 @@ class Pipeline(Estimator, MLReadable, MLWritable): return _java_obj +def _make_java_param_pair(self, param, value): +""" +Makes a Java param pair. +""" +sc = SparkContext._active_spark_context +param = self._resolveParam(param) +java_param = sc._jvm.org.apache.spark.ml.param.Param(param.parent, param.name, param.doc) +if isinstance(value, JavaParams): +# used in the case of an estimator having another estimator as a parameter +# the reason why this is not in _py2java in common.py is that importing +# Estimator and Model in common.py results in a circular import with inherit_doc +java_value = value._to_java() +else: +java_value = _py2java(sc, value) +return java_param.w(java_value) + +def _transfer_param_map_to_java(self, pyParamMap): +""" +Transforms a Python ParamMap into a Java ParamMap. +""" +paramMap = JavaWrapper._new_java_obj("org.apache.spark.ml.param.ParamMap") +for param in self.params: +if param in pyParamMap: +pair = self._make_java_param_pair(param, pyParamMap[param]) +paramMap.put([pair]) +return paramMap + +def _transfer_param_map_from_java(self, javaParamMap): +""" +Transforms a Java ParamMap into a Python ParamMap. +""" +sc = SparkContext._active_spark_context +paramMap = dict() +for pair in javaParamMap.toList(): +param = pair.param() +if self.hasParam(str(param.name())): +if param.name() == "classifier": +paramMap[self.getParam(param.name())] = JavaParams._from_java(pair.value()) +else: +paramMap[self.getParam(param.name())] = _java2py(sc, pair.value()) +return paramMap + @inherit_doc class PipelineWriter(MLWriter): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch fix_pipeline_tuning created (now 2af493a)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch fix_pipeline_tuning in repository https://gitbox.apache.org/repos/asf/spark.git. at 2af493a init This branch includes the following new commits: new c834fe8 init new 2af493a init The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: Revert "[SPARK-30791][SQL][PYTHON] Add 'sameSemantics' and 'sementicHash' methods in Dataset"
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new cb890d9 Revert "[SPARK-30791][SQL][PYTHON] Add 'sameSemantics' and 'sementicHash' methods in Dataset" cb890d9 is described below commit cb890d96bc38860988dba97efaf6d88cc8c09288 Author: WeichenXu AuthorDate: Tue Feb 18 10:41:49 2020 +0800 Revert "[SPARK-30791][SQL][PYTHON] Add 'sameSemantics' and 'sementicHash' methods in Dataset" This reverts commit ba9141592d0f0ce23c207efb21ae84ac7cc4670a. --- python/pyspark/sql/dataframe.py| 46 -- python/pyspark/sql/tests/test_dataframe.py | 5 --- .../main/scala/org/apache/spark/sql/Dataset.scala | 28 - .../scala/org/apache/spark/sql/DatasetSuite.scala | 15 --- 4 files changed, 94 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 8325b68..2432b81 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -2153,52 +2153,6 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): "should have been DataFrame." % type(result) return result -@since(3.1) -def sameSemantics(self, other): -""" -Returns `True` when the logical query plans inside both :class:`DataFrame`\\s are equal and -therefore return same results. - -.. note:: The equality comparison here is simplified by tolerating the cosmetic differences -such as attribute names. - -.. note:: This API can compare both :class:`DataFrame`\\s very fast but can still return -`False` on the :class:`DataFrame` that return the same results, for instance, from -different plans. Such false negative semantic can be useful when caching as an example. - -.. note:: DeveloperApi - ->>> df1 = spark.range(10) ->>> df2 = spark.range(10) ->>> df1.withColumn("col1", df1.id * 2).sameSemantics(df2.withColumn("col1", df2.id * 2)) -True ->>> df1.withColumn("col1", df1.id * 2).sameSemantics(df2.withColumn("col1", df2.id + 2)) -False ->>> df1.withColumn("col1", df1.id * 2).sameSemantics(df2.withColumn("col0", df2.id * 2)) -True -""" -if not isinstance(other, DataFrame): -raise ValueError("other parameter should be of DataFrame; however, got %s" - % type(other)) -return self._jdf.sameSemantics(other._jdf) - -@since(3.1) -def semanticHash(self): -""" -Returns a hash code of the logical query plan against this :class:`DataFrame`. - -.. note:: Unlike the standard hash code, the hash is calculated against the query plan -simplified by tolerating the cosmetic differences such as attribute names. - -.. note:: DeveloperApi - ->>> spark.range(10).selectExpr("id as col0").semanticHash() # doctest: +SKIP -1855039936 ->>> spark.range(10).selectExpr("id as col1").semanticHash() # doctest: +SKIP -1855039936 -""" -return self._jdf.semanticHash() - where = copy_func( filter, sinceversion=1.3, diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py index 942cd4b..d738449 100644 --- a/python/pyspark/sql/tests/test_dataframe.py +++ b/python/pyspark/sql/tests/test_dataframe.py @@ -782,11 +782,6 @@ class DataFrameTests(ReusedSQLTestCase): break self.assertEqual(df.take(8), result) -def test_same_semantics_error(self): -with QuietTest(self.sc): -with self.assertRaisesRegexp(ValueError, "should be of DataFrame.*int"): -self.spark.range(10).sameSemantics(1) - class QueryExecutionListenerTests(unittest.TestCase, SQLTestUtils): # These tests are separate because it uses 'spark.sql.queryExecutionListeners' which is diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 5cd2583..42f3535 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -3310,34 +3310,6 @@ class Dataset[T] private[sql]( files.toSet.toArray } - /** - * Returns `true` when the logical query plans inside both [[Dataset]]s are equal and - * therefore return same results. - * - * @note The equality comp
[spark] branch master updated (4ed9b88 -> d8c0599)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4ed9b88 [SPARK-30832][DOCS] SQL function doc headers should link to anchors add d8c0599 [SPARK-30791][SQL][PYTHON] Add 'sameSemantics' and 'sementicHash' methods in Dataset No new revisions were added by this update. Summary of changes: python/pyspark/sql/dataframe.py| 46 ++ python/pyspark/sql/tests/test_dataframe.py | 5 +++ .../main/scala/org/apache/spark/sql/Dataset.scala | 28 + .../scala/org/apache/spark/sql/DatasetSuite.scala | 15 +++ 4 files changed, 94 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-30791][SQL][PYTHON] Add 'sameSemantics' and 'sementicHash' methods in Dataset
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new ba91415 [SPARK-30791][SQL][PYTHON] Add 'sameSemantics' and 'sementicHash' methods in Dataset ba91415 is described below commit ba9141592d0f0ce23c207efb21ae84ac7cc4670a Author: Liang Zhang AuthorDate: Tue Feb 18 09:22:26 2020 +0800 [SPARK-30791][SQL][PYTHON] Add 'sameSemantics' and 'sementicHash' methods in Dataset ### What changes were proposed in this pull request? This PR added two DeveloperApis to the Dataset[T] class. Both methods are just exposing lower-level methods to the Dataset[T] class. ### Why are the changes needed? They are useful for checking whether two dataframes are the same when implementing dataframe caching in python, and also get a unique ID. It's easier to use if we wrap the lower-level APIs. ### Does this PR introduce any user-facing change? ``` scala> val df1 = Seq((1,2),(4,5)).toDF("col1", "col2") df1: org.apache.spark.sql.DataFrame = [col1: int, col2: int] scala> val df2 = Seq((1,2),(4,5)).toDF("col1", "col2") df2: org.apache.spark.sql.DataFrame = [col1: int, col2: int] scala> val df3 = Seq((0,2),(4,5)).toDF("col1", "col2") df3: org.apache.spark.sql.DataFrame = [col1: int, col2: int] scala> val df4 = Seq((0,2),(4,5)).toDF("col0", "col2") df4: org.apache.spark.sql.DataFrame = [col0: int, col2: int] scala> df1.semanticHash res0: Int = 594427822 scala> df2.semanticHash res1: Int = 594427822 scala> df1.sameSemantics(df2) res2: Boolean = true scala> df1.sameSemantics(df3) res3: Boolean = false scala> df3.semanticHash res4: Int = -1592702048 scala> df4.semanticHash res5: Int = -1592702048 scala> df4.sameSemantics(df3) res6: Boolean = true ``` ### How was this patch tested? Unit test in scala and doctest in python. Note: comments are copied from the corresponding lower-level APIs. Note: There are some issues to be fixed that would improve the hash collision rate: https://github.com/apache/spark/pull/27565#discussion_r379881028 Closes #27565 from liangz1/df-same-result. Authored-by: Liang Zhang Signed-off-by: WeichenXu (cherry picked from commit d8c0599e542976ef70b60bc673e7c9732fce49e5) Signed-off-by: WeichenXu --- python/pyspark/sql/dataframe.py| 46 ++ python/pyspark/sql/tests/test_dataframe.py | 5 +++ .../main/scala/org/apache/spark/sql/Dataset.scala | 28 + .../scala/org/apache/spark/sql/DatasetSuite.scala | 15 +++ 4 files changed, 94 insertions(+) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 2432b81..8325b68 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -2153,6 +2153,52 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): "should have been DataFrame." % type(result) return result +@since(3.1) +def sameSemantics(self, other): +""" +Returns `True` when the logical query plans inside both :class:`DataFrame`\\s are equal and +therefore return same results. + +.. note:: The equality comparison here is simplified by tolerating the cosmetic differences +such as attribute names. + +.. note:: This API can compare both :class:`DataFrame`\\s very fast but can still return +`False` on the :class:`DataFrame` that return the same results, for instance, from +different plans. Such false negative semantic can be useful when caching as an example. + +.. note:: DeveloperApi + +>>> df1 = spark.range(10) +>>> df2 = spark.range(10) +>>> df1.withColumn("col1", df1.id * 2).sameSemantics(df2.withColumn("col1", df2.id * 2)) +True +>>> df1.withColumn("col1", df1.id * 2).sameSemantics(df2.withColumn("col1", df2.id + 2)) +False +>>> df1.withColumn("col1", df1.id * 2).sameSemantics(df2.withColumn("col0", df2.id * 2)) +True +""" +if not isinstance(other, DataFrame): +raise ValueError("other parameter should be of DataFrame; however, got %s" + % type(other)) +return self._jdf.sameSemantics(other._jdf) + +@since(3.1) +def semanticHash(self): +""" +Ret
[spark] branch branch-3.0 updated: [SPARK-30762] Add dtype=float32 support to vector_to_array UDF
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 074712e [SPARK-30762] Add dtype=float32 support to vector_to_array UDF 074712e is described below commit 074712e329b347f769f8c009949c7845e95b3212 Author: Liang Zhang AuthorDate: Thu Feb 13 23:55:13 2020 +0800 [SPARK-30762] Add dtype=float32 support to vector_to_array UDF ### What changes were proposed in this pull request? In this PR, we add a parameter in the python function vector_to_array(col) that allows converting to a column of arrays of Float (32bits) in scala, which would be mapped to a numpy array of dtype=float32. ### Why are the changes needed? In the downstream ML training, using float32 instead of float64 (default) would allow a larger batch size, i.e., allow more data to fit in the memory. ### Does this PR introduce any user-facing change? Yes. Old: `vector_to_array()` only take one param ``` df.select(vector_to_array("colA"), ...) ``` New: `vector_to_array()` can take an additional optional param: `dtype` = "float32" (or "float64") ``` df.select(vector_to_array("colA", "float32"), ...) ``` ### How was this patch tested? Unit test in scala. doctest in python. Closes #27522 from liangz1/udf-float32. Authored-by: Liang Zhang Signed-off-by: WeichenXu (cherry picked from commit 82d0aa37ae521231d8067e473c6ea79a118a115a) Signed-off-by: WeichenXu --- .../main/scala/org/apache/spark/ml/functions.scala | 34 +++--- .../scala/org/apache/spark/ml/FunctionsSuite.scala | 33 ++--- python/pyspark/ml/functions.py | 27 + 3 files changed, 81 insertions(+), 13 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/functions.scala b/mllib/src/main/scala/org/apache/spark/ml/functions.scala index 1faf562..0f03231 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/functions.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/functions.scala @@ -18,7 +18,7 @@ package org.apache.spark.ml import org.apache.spark.annotation.Since -import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.linalg.{SparseVector, Vector} import org.apache.spark.mllib.linalg.{Vector => OldVector} import org.apache.spark.sql.Column import org.apache.spark.sql.functions.udf @@ -27,7 +27,6 @@ import org.apache.spark.sql.functions.udf @Since("3.0.0") object functions { // scalastyle:on - private val vectorToArrayUdf = udf { vec: Any => vec match { case v: Vector => v.toArray @@ -39,10 +38,37 @@ object functions { } }.asNonNullable() + private val vectorToArrayFloatUdf = udf { vec: Any => +vec match { + case v: SparseVector => +val data = new Array[Float](v.size) +v.foreachActive { (index, value) => data(index) = value.toFloat } +data + case v: Vector => v.toArray.map(_.toFloat) + case v: OldVector => v.toArray.map(_.toFloat) + case v => throw new IllegalArgumentException( +"function vector_to_array requires a non-null input argument and input type must be " + +"`org.apache.spark.ml.linalg.Vector` or `org.apache.spark.mllib.linalg.Vector`, " + +s"but got ${ if (v == null) "null" else v.getClass.getName }.") +} + }.asNonNullable() + /** * Converts a column of MLlib sparse/dense vectors into a column of dense arrays. - * + * @param v: the column of MLlib sparse/dense vectors + * @param dtype: the desired underlying data type in the returned array + * @return an arrayfloat if dtype is float32, or arraydouble if dtype is float64 * @since 3.0.0 */ - def vector_to_array(v: Column): Column = vectorToArrayUdf(v) + def vector_to_array(v: Column, dtype: String = "float64"): Column = { +if (dtype == "float64") { + vectorToArrayUdf(v) +} else if (dtype == "float32") { + vectorToArrayFloatUdf(v) +} else { + throw new IllegalArgumentException( +s"Unsupported dtype: $dtype. Valid values: float64, float32." + ) +} + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala index 2f5062c..3dd9a7d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala @@ -34,9 +34,8 @@ class FunctionsSuite extends MLTest { (Vectors.sparse(3, Seq((0, 2.0), (2, 3.0))), OldVectors.sparse(3, Seq((0, 20.0), (2, 30.0 ).toDF("vec", "oldVec")
[spark] branch master updated (fb0e07b -> 82d0aa3)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from fb0e07b [SPARK-29231][SQL] Constraints should be inferred from cast equality constraint add 82d0aa3 [SPARK-30762] Add dtype=float32 support to vector_to_array UDF No new revisions were added by this update. Summary of changes: .../main/scala/org/apache/spark/ml/functions.scala | 34 +++--- .../scala/org/apache/spark/ml/FunctionsSuite.scala | 33 ++--- python/pyspark/ml/functions.py | 27 + 3 files changed, 81 insertions(+), 13 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (fb0e07b -> 82d0aa3)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from fb0e07b [SPARK-29231][SQL] Constraints should be inferred from cast equality constraint add 82d0aa3 [SPARK-30762] Add dtype=float32 support to vector_to_array UDF No new revisions were added by this update. Summary of changes: .../main/scala/org/apache/spark/ml/functions.scala | 34 +++--- .../scala/org/apache/spark/ml/FunctionsSuite.scala | 33 ++--- python/pyspark/ml/functions.py | 27 + 3 files changed, 81 insertions(+), 13 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (79b10a1 -> 104b9b6)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 79b10a1 [SPARK-28929][CORE] Spark Logging level should be INFO instead of DEBUG in Executor Plugin API add 104b9b6 [SPARK-28483][FOLLOW-UP] Fix flaky test in BarrierTaskContextSuite No new revisions were added by this update. Summary of changes: .../spark/scheduler/BarrierTaskContextSuite.scala | 72 +- 1 file changed, 30 insertions(+), 42 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark-website] branch asf-site updated: Add Weichen Xu to committer list
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/spark-website.git The following commit(s) were added to refs/heads/asf-site by this push: new 7ade5ef Add Weichen Xu to committer list 7ade5ef is described below commit 7ade5efec5fca08252837d5b2e2bb2bee0494947 Author: WeichenXu AuthorDate: Tue Sep 10 17:36:55 2019 +0800 Add Weichen Xu to committer list Add Weichen Xu to committer list. Author: WeichenXu Closes #222 from WeichenXu123/add_weichen_committer. --- committers.md| 1 + site/committers.html | 4 2 files changed, 5 insertions(+) diff --git a/committers.md b/committers.md index d8ba384..2627f36 100644 --- a/committers.md +++ b/committers.md @@ -78,6 +78,7 @@ navigation: |Patrick Wendell|Databricks| |Andrew Xia|Alibaba| |Reynold Xin|Databricks| +|Weichen Xu|Databricks| |Takeshi Yamamuro|NTT| |Burak Yavuz|Databricks| |Matei Zaharia|Databricks, Stanford| diff --git a/site/committers.html b/site/committers.html index 1c950ff..f5da5ab 100644 --- a/site/committers.html +++ b/site/committers.html @@ -483,6 +483,10 @@ Databricks + Weichen Xu + Databricks + + Takeshi Yamamuro NTT - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org