(spark) branch master updated: [SPARK-48970][PYTHON][ML] Avoid using SparkSession.getActiveSession in spark ML reader/writer

2024-07-23 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new fba4c8c20e52 [SPARK-48970][PYTHON][ML] Avoid using 
SparkSession.getActiveSession in spark ML reader/writer
fba4c8c20e52 is described below

commit fba4c8c20e523c9a441f007442efd616320e7be4
Author: Weichen Xu 
AuthorDate: Tue Jul 23 19:19:28 2024 +0800

[SPARK-48970][PYTHON][ML] Avoid using SparkSession.getActiveSession in 
spark ML reader/writer

### What changes were proposed in this pull request?

`SparkSession.getActiveSession` is thread-local session, but spark ML 
reader / writer might be executed in different threads which causes 
`SparkSession.getActiveSession` returning None.

### Why are the changes needed?

It fixes the bug like:
```
spark = SparkSession.getActiveSession()
>   spark.createDataFrame(  # type: ignore[union-attr]
[(metadataJson,)], schema=["value"]
).coalesce(1).write.text(metadataPath)
E   AttributeError: 'NoneType' object has no attribute 'createDataFrame'
```

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Manually.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #47453 from WeichenXu123/SPARK-48970.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
---
 .../src/main/scala/org/apache/spark/ml/util/ReadWrite.scala  |  2 +-
 python/pyspark/ml/util.py| 12 ++--
 2 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala 
b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
index 021595f76c24..c127575e1470 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
@@ -588,7 +588,7 @@ private[ml] object DefaultParamsReader {
*/
   def loadMetadata(path: String, sc: SparkContext, expectedClassName: String = 
""): Metadata = {
 val metadataPath = new Path(path, "metadata").toString
-val spark = SparkSession.getActiveSession.get
+val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
 val metadataStr = spark.read.text(metadataPath).first().getString(0)
 parseMetadata(metadataStr, expectedClassName)
   }
diff --git a/python/pyspark/ml/util.py b/python/pyspark/ml/util.py
index 5e7965554d82..89e2f9631564 100644
--- a/python/pyspark/ml/util.py
+++ b/python/pyspark/ml/util.py
@@ -464,10 +464,10 @@ class DefaultParamsWriter(MLWriter):
 metadataJson = DefaultParamsWriter._get_metadata_to_save(
 instance, sc, extraMetadata, paramMap
 )
-spark = SparkSession.getActiveSession()
-spark.createDataFrame(  # type: ignore[union-attr]
-[(metadataJson,)], schema=["value"]
-).coalesce(1).write.text(metadataPath)
+spark = SparkSession._getActiveSessionOrCreate()
+spark.createDataFrame([(metadataJson,)], 
schema=["value"]).coalesce(1).write.text(
+metadataPath
+)
 
 @staticmethod
 def _get_metadata_to_save(
@@ -580,8 +580,8 @@ class DefaultParamsReader(MLReader[RL]):
 If non empty, this is checked against the loaded metadata.
 """
 metadataPath = os.path.join(path, "metadata")
-spark = SparkSession.getActiveSession()
-metadataStr = spark.read.text(metadataPath).first()[0]  # type: 
ignore[union-attr,index]
+spark = SparkSession._getActiveSessionOrCreate()
+metadataStr = spark.read.text(metadataPath).first()[0]  # type: 
ignore[index]
 loadedVals = DefaultParamsReader._parseMetaData(metadataStr, 
expectedClassName)
 return loadedVals
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-48941][PYTHON][ML] Replace RDD read / write API invocation with Dataframe read / write API

2024-07-22 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new be1afd504a44 [SPARK-48941][PYTHON][ML] Replace RDD read / write API 
invocation with Dataframe read / write API
be1afd504a44 is described below

commit be1afd504a44ea6058f764e0adf7140eedf704db
Author: Weichen Xu 
AuthorDate: Mon Jul 22 21:18:54 2024 +0800

[SPARK-48941][PYTHON][ML] Replace RDD read / write API invocation with 
Dataframe read / write API

### What changes were proposed in this pull request?

PysparkML: Replace RDD read / write API invocation with Dataframe read / 
write API

### Why are the changes needed?

Follow-up of https://github.com/apache/spark/pull/47341

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #47411 from WeichenXu123/SPARK-48909-follow-up.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
---
 python/pyspark/ml/util.py | 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/ml/util.py b/python/pyspark/ml/util.py
index b9a2829a1ca0..5e7965554d82 100644
--- a/python/pyspark/ml/util.py
+++ b/python/pyspark/ml/util.py
@@ -464,7 +464,10 @@ class DefaultParamsWriter(MLWriter):
 metadataJson = DefaultParamsWriter._get_metadata_to_save(
 instance, sc, extraMetadata, paramMap
 )
-sc.parallelize([metadataJson], 1).saveAsTextFile(metadataPath)
+spark = SparkSession.getActiveSession()
+spark.createDataFrame(  # type: ignore[union-attr]
+[(metadataJson,)], schema=["value"]
+).coalesce(1).write.text(metadataPath)
 
 @staticmethod
 def _get_metadata_to_save(
@@ -577,7 +580,8 @@ class DefaultParamsReader(MLReader[RL]):
 If non empty, this is checked against the loaded metadata.
 """
 metadataPath = os.path.join(path, "metadata")
-metadataStr = sc.textFile(metadataPath, 1).first()
+spark = SparkSession.getActiveSession()
+metadataStr = spark.read.text(metadataPath).first()[0]  # type: 
ignore[union-attr,index]
 loadedVals = DefaultParamsReader._parseMetaData(metadataStr, 
expectedClassName)
 return loadedVals
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-48463][ML] Make StringIndexer supporting nested input columns

2024-07-15 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 9bff2c8bc505 [SPARK-48463][ML] Make StringIndexer supporting nested 
input columns
9bff2c8bc505 is described below

commit 9bff2c8bc5059f5be0dc6e8105c11403942a0b9f
Author: Weichen Xu 
AuthorDate: Mon Jul 15 15:19:59 2024 +0800

[SPARK-48463][ML] Make StringIndexer supporting nested input columns

### What changes were proposed in this pull request?

Make StringIndexer supporting nested input columns

### Why are the changes needed?

User demand.

### Does this PR introduce _any_ user-facing change?

Yes.

### How was this patch tested?

Unit tests.

### Was this patch authored or co-authored using generative AI tooling?

Closes #47283 from WeichenXu123/SPARK-48463.

Lead-authored-by: Weichen Xu 
Co-authored-by: WeichenXu 
Signed-off-by: Weichen Xu 
---
 .../apache/spark/ml/feature/StringIndexer.scala| 37 +++--
 .../spark/ml/feature/StringIndexerSuite.scala  | 47 +-
 2 files changed, 71 insertions(+), 13 deletions(-)

diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
index 60dc4d024071..34f77f029395 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.ml.feature
 
+import java.util.ArrayList
+
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkException
@@ -27,7 +29,7 @@ import org.apache.spark.ml.attribute.{Attribute, 
NominalAttribute}
 import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util._
-import org.apache.spark.sql.{Column, DataFrame, Dataset, Encoder, Encoders, 
Row}
+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, 
Encoder, Encoders, Row, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.{If, Literal}
 import org.apache.spark.sql.expressions.Aggregator
 import org.apache.spark.sql.functions._
@@ -103,8 +105,8 @@ private[feature] trait StringIndexerBase extends Params 
with HasHandleInvalid wi
   private def validateAndTransformField(
   schema: StructType,
   inputColName: String,
+  inputDataType: DataType,
   outputColName: String): StructField = {
-val inputDataType = schema(inputColName).dataType
 require(inputDataType == StringType || 
inputDataType.isInstanceOf[NumericType],
   s"The input column $inputColName must be either string type or numeric 
type, " +
 s"but got $inputDataType.")
@@ -122,12 +124,22 @@ private[feature] trait StringIndexerBase extends Params 
with HasHandleInvalid wi
 require(outputColNames.distinct.length == outputColNames.length,
   s"Output columns should not be duplicate.")
 
+val sparkSession = SparkSession.getActiveSession.get
+val transformDataset = sparkSession.createDataFrame(new ArrayList[Row](), 
schema = schema)
 val outputFields = inputColNames.zip(outputColNames).flatMap {
   case (inputColName, outputColName) =>
-schema.fieldNames.contains(inputColName) match {
-  case true => Some(validateAndTransformField(schema, inputColName, 
outputColName))
-  case false if skipNonExistsCol => None
-  case _ => throw new SparkException(s"Input column $inputColName does 
not exist.")
+try {
+  val dtype = transformDataset.col(inputColName).expr.dataType
+  Some(
+validateAndTransformField(schema, inputColName, dtype, 
outputColName)
+  )
+} catch {
+  case _: AnalysisException =>
+if (skipNonExistsCol) {
+  None
+} else {
+  throw new SparkException(s"Input column $inputColName does not 
exist.")
+}
 }
 }
 StructType(schema.fields ++ outputFields)
@@ -431,11 +443,8 @@ class StringIndexerModel (
   val labelToIndex = labelsToIndexArray(i)
   val labels = labelsArray(i)
 
-  if (!dataset.schema.fieldNames.contains(inputColName)) {
-logWarning(log"Input column ${MDC(LogKeys.COLUMN_NAME, inputColName)} 
does not exist " +
-  log"during transformation. Skip StringIndexerModel for this column.")
-outputColNames(i) = null
-  } else {
+  try {
+dataset.col(inputColName)
 val filteredLabels = getHandleInvalid match {
   case StringIndexer.KEEP_INVALID => labels :+ "__unknown"
   case _ => labels
@@ -449,9 +458,13 @@ c

(spark) branch master updated: [SPARK-48883][ML][R] Replace RDD read / write API invocation with Dataframe read / write API

2024-07-12 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 0fa5787d0a6b [SPARK-48883][ML][R] Replace RDD read / write API 
invocation with Dataframe read / write API
0fa5787d0a6b is described below

commit 0fa5787d0a6bd17ccd05ff561bc8dfa88af03312
Author: Weichen Xu 
AuthorDate: Fri Jul 12 22:20:37 2024 +0800

[SPARK-48883][ML][R] Replace RDD read / write API invocation with Dataframe 
read / write API

### What changes were proposed in this pull request?

Replace RDD read / write API invocation with Dataframe read / write API

### Why are the changes needed?

In databricks runtime, RDD read / write API has some issue for certain 
storage types that requires the account key, but Dataframe read / write API 
works.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #47328 from WeichenXu123/ml-df-writer-save-2.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
---
 .../org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala  | 7 +--
 mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala   | 7 +--
 .../main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala | 7 +--
 .../org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala | 7 +--
 .../org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala  | 7 +--
 .../main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala| 7 +--
 .../src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala | 7 +--
 mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala  | 4 +++-
 .../main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala   | 7 +--
 .../main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala| 7 +--
 .../main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala | 7 +--
 .../apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala| 7 +--
 .../scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala   | 7 +--
 mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala| 7 +--
 mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala   | 7 +--
 .../scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala | 7 +--
 mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala | 7 +--
 .../scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala   | 7 +--
 .../apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala | 4 +++-
 .../src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala  | 7 +--
 mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala| 3 ++-
 .../org/apache/spark/ml/r/RandomForestClassifierWrapper.scala | 8 ++--
 .../org/apache/spark/ml/r/RandomForestRegressorWrapper.scala  | 8 ++--
 mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala | 6 --
 24 files changed, 114 insertions(+), 45 deletions(-)

diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
index 7eef3ced422e..67057b3fcef6 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
@@ -129,7 +129,9 @@ private[r] object AFTSurvivalRegressionWrapper extends 
MLReadable[AFTSurvivalReg
   val rMetadata = ("class" -> instance.getClass.getName) ~
 ("features" -> instance.features.toImmutableArraySeq)
   val rMetadataJson: String = compact(render(rMetadata))
-  sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+  sparkSession.createDataFrame(
+Seq(Tuple1(rMetadataJson))
+  ).repartition(1).write.text(rMetadataPath)
 
   instance.pipeline.save(pipelinePath)
 }
@@ -142,7 +144,8 @@ private[r] object AFTSurvivalRegressionWrapper extends 
MLReadable[AFTSurvivalReg
   val rMetadataPath = new Path(path, "rMetadata").toString
   val pipelinePath = new Path(path, "pipeline").toString
 
-  val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+  val rMetadataStr = sparkSession.read.text(rMetadataPath)
+.first().getString(0)
   val rMetadata = parse(rMetadataStr)
   val features = (rMetadata \ "features").extract[Array[String]]
 
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala
index 125cdf7259fe..5fc19450d219 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala
+++ b/mll

(spark) branch master updated: [SPARK-47663][CORE][TESTS] add end to end test for task limiting according to different cpu and gpu configurations

2024-04-02 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new eb9b12692601 [SPARK-47663][CORE][TESTS] add end to end test for task 
limiting according to different cpu and gpu configurations
eb9b12692601 is described below

commit eb9b126926016e0156b1d40b1d7ed33d4705d2bb
Author: Bobby Wang 
AuthorDate: Tue Apr 2 15:30:10 2024 +0800

[SPARK-47663][CORE][TESTS] add end to end test for task limiting according 
to different cpu and gpu configurations

### What changes were proposed in this pull request?
Add an end-to-end unit test to ensure that the number of tasks is 
calculated correctly according to the different task CPU amound and task GPU 
amount.

### Why are the changes needed?
To increase the test coverage. More details can be found at 
https://github.com/apache/spark/pull/45528#discussion_r1545905575

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
The CI can pass.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #45794 from wbo4958/end2end-test.

Authored-by: Bobby Wang 
Signed-off-by: Weichen Xu 
---
 .../CoarseGrainedSchedulerBackendSuite.scala   | 47 ++
 1 file changed, 47 insertions(+)

diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index 6e94f9abe67b..a75f470deec3 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -179,6 +179,53 @@ class CoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite with LocalSparkCo
 }
   }
 
+  // Every item corresponds to (CPU resources per task, GPU resources per task,
+  // and the GPU addresses assigned to all tasks).
+  Seq(
+(1, 1, Array(Array("0"), Array("1"), Array("2"), Array("3"))),
+(1, 2, Array(Array("0", "1"), Array("2", "3"))),
+(1, 4, Array(Array("0", "1", "2", "3"))),
+(2, 1, Array(Array("0"), Array("1"))),
+(4, 1, Array(Array("0"))),
+(4, 2, Array(Array("0", "1"))),
+(2, 2, Array(Array("0", "1"), Array("2", "3"))),
+(4, 4, Array(Array("0", "1", "2", "3"))),
+(1, 3, Array(Array("0", "1", "2"))),
+(3, 1, Array(Array("0")))
+  ).foreach { case (taskCpus, taskGpus, expectedGpuAddresses) =>
+test(s"SPARK-47663 end to end test validating if task cpus:${taskCpus} and 
" +
+  s"task gpus: ${taskGpus} works") {
+  withTempDir { dir =>
+val discoveryScript = createTempScriptWithExpectedOutput(
+  dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0", "1", 
"2", "3"]}""")
+val conf = new SparkConf()
+  .set(CPUS_PER_TASK, taskCpus)
+  .setMaster("local-cluster[1, 4, 1024]")
+  .setAppName("test")
+  .set(WORKER_GPU_ID.amountConf, "4")
+  .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
+  .set(EXECUTOR_GPU_ID.amountConf, "4")
+  .set(TASK_GPU_ID.amountConf, taskGpus.toString)
+
+sc = new SparkContext(conf)
+eventually(timeout(executorUpTimeout)) {
+  // Ensure all executors have been launched.
+  assert(sc.getExecutorIds().length == 1)
+}
+
+val numPartitions = Seq(4 / taskCpus, 4 / taskGpus).min
+val ret = sc.parallelize(1 to 20, numPartitions).mapPartitions { _ =>
+  val tc = TaskContext.get()
+  assert(tc.cpus() == taskCpus)
+  val gpus = tc.resources()("gpu").addresses
+  Iterator.single(gpus)
+}.collect()
+
+assert(ret === expectedGpuAddresses)
+  }
+}
+  }
+
   // Here we just have test for one happy case instead of all cases: other 
cases are covered in
   // FsHistoryProviderSuite.
   test("custom log url for Spark UI is applied") {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated (081809667611 -> c4e4497ff7e7)

2024-02-18 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 081809667611 [MINOR][SQL] Remove `unsupportedOperationMsg` from 
`CaseInsensitiveStringMap`
 add c4e4497ff7e7 [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow 
support ResourceProfile

No new revisions were added by this update.

Summary of changes:
 .../sql/connect/planner/SparkConnectPlanner.scala  |   6 +-
 dev/sparktestsupport/modules.py|   1 +
 python/pyspark/sql/pandas/map_ops.py   |  61 +++-
 python/pyspark/sql/tests/test_resources.py | 104 +
 .../catalyst/analysis/DeduplicateRelations.scala   |   4 +-
 .../plans/logical/pythonLogicalOperators.scala |   7 +-
 .../sql/catalyst/analysis/AnalysisSuite.scala  |   9 +-
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  17 +++-
 .../spark/sql/execution/SparkStrategies.scala  |   8 +-
 .../sql/execution/python/MapInArrowExec.scala  |   4 +-
 .../sql/execution/python/MapInBatchExec.scala  |   6 +-
 .../sql/execution/python/MapInPandasExec.scala |   4 +-
 12 files changed, 206 insertions(+), 25 deletions(-)
 create mode 100644 python/pyspark/sql/tests/test_resources.py


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (8394ebb52b9 -> e1a7b84f47b)

2023-10-11 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 8394ebb52b9 [SPARK-45469][CORE][SQL][CONNECT][PYTHON] Replace 
`toIterator` with `iterator` for `IterableOnce`
 add e1a7b84f47b [SPARK-45397][ML][CONNECT] Add array assembler feature 
transformer

No new revisions were added by this update.

Summary of changes:
 .../docs/source/reference/pyspark.ml.connect.rst   |   1 +
 python/pyspark/ml/connect/base.py  |   6 +-
 python/pyspark/ml/connect/feature.py   | 156 -
 python/pyspark/ml/connect/util.py  |   6 +-
 python/pyspark/ml/param/_shared_params_code_gen.py |   7 +
 python/pyspark/ml/param/shared.py  |  22 +++
 .../ml/tests/connect/test_legacy_mode_feature.py   |  48 +++
 7 files changed, 238 insertions(+), 8 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45386][SQL]: Fix correctness issue with persist using StorageLevel.NONE on Dataset (#43188)

2023-10-02 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a0c9ab63f3b [SPARK-45386][SQL]: Fix correctness issue with persist 
using StorageLevel.NONE on Dataset (#43188)
a0c9ab63f3b is described below

commit a0c9ab63f3bcf4c9bb56c407375ce1c8cc26fb02
Author: Emil Ejbyfeldt 
AuthorDate: Mon Oct 2 11:36:53 2023 +0200

[SPARK-45386][SQL]: Fix correctness issue with persist using 
StorageLevel.NONE on Dataset (#43188)

* SPARK-45386: Fix correctness issue with StorageLevel.NONE

* Move to CacheManager

* Add comment
---
 .../main/scala/org/apache/spark/sql/execution/CacheManager.scala| 4 +++-
 sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 6 ++
 2 files changed, 9 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 064819275e0..e906c74f8a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -113,7 +113,9 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
   planToCache: LogicalPlan,
   tableName: Option[String],
   storageLevel: StorageLevel): Unit = {
-if (lookupCachedData(planToCache).nonEmpty) {
+if (storageLevel == StorageLevel.NONE) {
+  // Do nothing for StorageLevel.NONE since it will not actually cache any 
data.
+} else if (lookupCachedData(planToCache).nonEmpty) {
   logWarning("Asked to cache already cached data.")
 } else {
   val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 04e619fa908..8fb25e120f5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -47,6 +47,7 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
+import org.apache.spark.storage.StorageLevel
 
 case class TestDataPoint(x: Int, y: Double, s: String, t: TestDataPoint2)
 case class TestDataPoint2(x: Int, s: String)
@@ -2604,6 +2605,11 @@ class DatasetSuite extends QueryTest
 parameters = Map("cls" -> classOf[Array[Int]].getName))
 }
   }
+
+  test("SPARK-45386: persist with StorageLevel.NONE should give correct 
count") {
+val ds = Seq(1, 2).toDS().persist(StorageLevel.NONE)
+assert(ds.count() == 2)
+  }
 }
 
 class DatasetLargeResultCollectingSuite extends QueryTest


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-44908][ML][CONNECT] Fix cross validator foldCol param functionality

2023-08-23 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 40ccabfd681 [SPARK-44908][ML][CONNECT] Fix cross validator foldCol 
param functionality
40ccabfd681 is described below

commit 40ccabfd68141eabe8f9b9bf15acad9fc6b7dff1
Author: Weichen Xu 
AuthorDate: Wed Aug 23 18:19:15 2023 +0800

[SPARK-44908][ML][CONNECT] Fix cross validator foldCol param functionality

### What changes were proposed in this pull request?

Fix cross validator foldCol param functionality.
In main branch the code calls `df.rdd` APIs but it is not supported in 
spark connect

### Why are the changes needed?

Bug fix.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

UT.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #42605 from WeichenXu123/fix-tuning-connect-foldCol.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
(cherry picked from commit 0d1b5975b2d308c616312d53b9f7ad754348a266)
Signed-off-by: Weichen Xu 
---
 python/pyspark/ml/connect/tuning.py| 24 ++---
 .../ml/tests/connect/test_legacy_mode_tuning.py| 25 ++
 2 files changed, 32 insertions(+), 17 deletions(-)

diff --git a/python/pyspark/ml/connect/tuning.py 
b/python/pyspark/ml/connect/tuning.py
index c22c31e84e8..871e448966c 100644
--- a/python/pyspark/ml/connect/tuning.py
+++ b/python/pyspark/ml/connect/tuning.py
@@ -42,8 +42,7 @@ from pyspark.ml.connect.io_utils import (
 )
 from pyspark.ml.param import Params, Param, TypeConverters
 from pyspark.ml.param.shared import HasParallelism, HasSeed
-from pyspark.sql.functions import col, lit, rand, UserDefinedFunction
-from pyspark.sql.types import BooleanType
+from pyspark.sql.functions import col, lit, rand
 from pyspark.sql.dataframe import DataFrame
 from pyspark.sql import SparkSession
 
@@ -477,23 +476,14 @@ class CrossValidator(
 train = df.filter(~condition)
 datasets.append((train, validation))
 else:
-# Use user-specified fold numbers.
-def checker(foldNum: int) -> bool:
-if foldNum < 0 or foldNum >= nFolds:
-raise ValueError(
-"Fold number must be in range [0, %s), but got %s." % 
(nFolds, foldNum)
-)
-return True
-
-checker_udf = UserDefinedFunction(checker, BooleanType())
+# TODO:
+#  Add verification that foldCol column values are in range [0, 
nFolds)
 for i in range(nFolds):
-training = dataset.filter(checker_udf(dataset[foldCol]) & 
(col(foldCol) != lit(i)))
-validation = dataset.filter(
-checker_udf(dataset[foldCol]) & (col(foldCol) == lit(i))
-)
-if training.rdd.getNumPartitions() == 0 or 
len(training.take(1)) == 0:
+training = dataset.filter(col(foldCol) != lit(i))
+validation = dataset.filter(col(foldCol) == lit(i))
+if training.isEmpty():
 raise ValueError("The training data at fold %s is empty." 
% i)
-if validation.rdd.getNumPartitions() == 0 or 
len(validation.take(1)) == 0:
+if validation.isEmpty():
 raise ValueError("The validation data at fold %s is 
empty." % i)
 datasets.append((training, validation))
 
diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py 
b/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py
index d6c813533d6..0ade227540c 100644
--- a/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py
+++ b/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py
@@ -246,6 +246,31 @@ class CrossValidatorTestsMixin:
 np.testing.assert_allclose(cv_model.avgMetrics, 
loaded_cv_model.avgMetrics)
 np.testing.assert_allclose(cv_model.stdMetrics, 
loaded_cv_model.stdMetrics)
 
+def test_crossvalidator_with_fold_col(self):
+sk_dataset = load_breast_cancer()
+
+train_dataset = self.spark.createDataFrame(
+zip(
+sk_dataset.data.tolist(),
+[int(t) for t in sk_dataset.target],
+[int(i % 3) for i in range(len(sk_dataset.target))],
+),
+schema="features: array, label: long, fold: long",
+)
+
+lorv2 = LORV2(numTrainWorkers=2)
+
+grid2 = ParamGridBuilder().addGrid(lorv2.maxIter, [2, 200]).build()
+cv = CrossValidator(
+estimator=lorv2,
+estimatorParamMaps=

[spark] branch master updated: [SPARK-44908][ML][CONNECT] Fix cross validator foldCol param functionality

2023-08-23 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 0d1b5975b2d [SPARK-44908][ML][CONNECT] Fix cross validator foldCol 
param functionality
0d1b5975b2d is described below

commit 0d1b5975b2d308c616312d53b9f7ad754348a266
Author: Weichen Xu 
AuthorDate: Wed Aug 23 18:19:15 2023 +0800

[SPARK-44908][ML][CONNECT] Fix cross validator foldCol param functionality

### What changes were proposed in this pull request?

Fix cross validator foldCol param functionality.
In main branch the code calls `df.rdd` APIs but it is not supported in 
spark connect

### Why are the changes needed?

Bug fix.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

UT.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #42605 from WeichenXu123/fix-tuning-connect-foldCol.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
---
 python/pyspark/ml/connect/tuning.py| 24 ++---
 .../ml/tests/connect/test_legacy_mode_tuning.py| 25 ++
 2 files changed, 32 insertions(+), 17 deletions(-)

diff --git a/python/pyspark/ml/connect/tuning.py 
b/python/pyspark/ml/connect/tuning.py
index c22c31e84e8..871e448966c 100644
--- a/python/pyspark/ml/connect/tuning.py
+++ b/python/pyspark/ml/connect/tuning.py
@@ -42,8 +42,7 @@ from pyspark.ml.connect.io_utils import (
 )
 from pyspark.ml.param import Params, Param, TypeConverters
 from pyspark.ml.param.shared import HasParallelism, HasSeed
-from pyspark.sql.functions import col, lit, rand, UserDefinedFunction
-from pyspark.sql.types import BooleanType
+from pyspark.sql.functions import col, lit, rand
 from pyspark.sql.dataframe import DataFrame
 from pyspark.sql import SparkSession
 
@@ -477,23 +476,14 @@ class CrossValidator(
 train = df.filter(~condition)
 datasets.append((train, validation))
 else:
-# Use user-specified fold numbers.
-def checker(foldNum: int) -> bool:
-if foldNum < 0 or foldNum >= nFolds:
-raise ValueError(
-"Fold number must be in range [0, %s), but got %s." % 
(nFolds, foldNum)
-)
-return True
-
-checker_udf = UserDefinedFunction(checker, BooleanType())
+# TODO:
+#  Add verification that foldCol column values are in range [0, 
nFolds)
 for i in range(nFolds):
-training = dataset.filter(checker_udf(dataset[foldCol]) & 
(col(foldCol) != lit(i)))
-validation = dataset.filter(
-checker_udf(dataset[foldCol]) & (col(foldCol) == lit(i))
-)
-if training.rdd.getNumPartitions() == 0 or 
len(training.take(1)) == 0:
+training = dataset.filter(col(foldCol) != lit(i))
+validation = dataset.filter(col(foldCol) == lit(i))
+if training.isEmpty():
 raise ValueError("The training data at fold %s is empty." 
% i)
-if validation.rdd.getNumPartitions() == 0 or 
len(validation.take(1)) == 0:
+if validation.isEmpty():
 raise ValueError("The validation data at fold %s is 
empty." % i)
 datasets.append((training, validation))
 
diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py 
b/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py
index d6c813533d6..0ade227540c 100644
--- a/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py
+++ b/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py
@@ -246,6 +246,31 @@ class CrossValidatorTestsMixin:
 np.testing.assert_allclose(cv_model.avgMetrics, 
loaded_cv_model.avgMetrics)
 np.testing.assert_allclose(cv_model.stdMetrics, 
loaded_cv_model.stdMetrics)
 
+def test_crossvalidator_with_fold_col(self):
+sk_dataset = load_breast_cancer()
+
+train_dataset = self.spark.createDataFrame(
+zip(
+sk_dataset.data.tolist(),
+[int(t) for t in sk_dataset.target],
+[int(i % 3) for i in range(len(sk_dataset.target))],
+),
+schema="features: array, label: long, fold: long",
+)
+
+lorv2 = LORV2(numTrainWorkers=2)
+
+grid2 = ParamGridBuilder().addGrid(lorv2.maxIter, [2, 200]).build()
+cv = CrossValidator(
+estimator=lorv2,
+estimatorParamMaps=grid2,
+parallelism=2,
+evaluator=BinaryClassificationEvaluator(),
+foldCol=&q

[spark] branch branch-3.5 updated: [SPARK-44909][ML] Skip starting torch distributor log streaming server when it is not available

2023-08-23 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 4f61662e91a [SPARK-44909][ML] Skip starting torch distributor log 
streaming server when it is not available
4f61662e91a is described below

commit 4f61662e91a77315a9d3d7454884f47c8bb9a6b1
Author: Weichen Xu 
AuthorDate: Wed Aug 23 15:30:50 2023 +0800

[SPARK-44909][ML] Skip starting torch distributor log streaming server when 
it is not available

### What changes were proposed in this pull request?

Skip starting torch distributor log streaming server when it is not 
available.

In some cases, e.g., in a databricks connect cluster, there is some network 
limitation that casues starting log streaming server failure, but, this does 
not need to break torch distributor training routine.

In this PR, it captures exception raised from log server `start` method, 
and set server port to be -1 if `start` failed.

### Why are the changes needed?

In some cases, e.g., in a databricks connect cluster, there is some network 
limitation that casues starting log streaming server failure, but, this does 
not need to break torch distributor training routine.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

UT.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #42606 from WeichenXu123/fix-torch-log-server-in-connect-mode.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
(cherry picked from commit 80668dc1a36ac0def80f3c18f981fbdacfb2904d)
Signed-off-by: Weichen Xu 
---
 python/pyspark/ml/torch/distributor.py   | 16 +---
 python/pyspark/ml/torch/log_communication.py |  3 +++
 2 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/python/pyspark/ml/torch/distributor.py 
b/python/pyspark/ml/torch/distributor.py
index b407672ac48..d0979f53d41 100644
--- a/python/pyspark/ml/torch/distributor.py
+++ b/python/pyspark/ml/torch/distributor.py
@@ -765,9 +765,19 @@ class TorchDistributor(Distributor):
 log_streaming_server = LogStreamingServer()
 self.driver_address = _get_conf(self.spark, "spark.driver.host", "")
 assert self.driver_address != ""
-log_streaming_server.start(spark_host_address=self.driver_address)
-time.sleep(1)  # wait for the server to start
-self.log_streaming_server_port = log_streaming_server.port
+try:
+log_streaming_server.start(spark_host_address=self.driver_address)
+time.sleep(1)  # wait for the server to start
+self.log_streaming_server_port = log_streaming_server.port
+except Exception as e:
+# If starting log streaming server failed, we don't need to break
+# the distributor training but emit a warning instead.
+self.log_streaming_server_port = -1
+self.logger.warning(
+"Start torch distributor log streaming server failed, "
+"You cannot receive logs sent from distributor workers, ",
+f"error: {repr(e)}.",
+)
 
 try:
 spark_task_function = self._get_spark_task_function(
diff --git a/python/pyspark/ml/torch/log_communication.py 
b/python/pyspark/ml/torch/log_communication.py
index ca91121d3e3..8efa83e62c3 100644
--- a/python/pyspark/ml/torch/log_communication.py
+++ b/python/pyspark/ml/torch/log_communication.py
@@ -156,6 +156,9 @@ class LogStreamingClient(LogStreamingClientBase):
 warnings.warn(f"{error_msg}: {traceback.format_exc()}\n")
 
 def _connect(self) -> None:
+if self.port == -1:
+self._fail("Log streaming server is not available.")
+return
 try:
 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 sock.settimeout(self.timeout)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (00cd5e846b6 -> 80668dc1a36)

2023-08-23 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 00cd5e846b6 [SPARK-44899][PYTHON][DOCS] Refine the docstring of 
DataFrame.collect
 add 80668dc1a36 [SPARK-44909][ML] Skip starting torch distributor log 
streaming server when it is not available

No new revisions were added by this update.

Summary of changes:
 python/pyspark/ml/torch/distributor.py   | 16 +---
 python/pyspark/ml/torch/log_communication.py |  3 +++
 2 files changed, 16 insertions(+), 3 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44264][ML][PYTHON] Incorporating FunctionPickler Into TorchDistributor

2023-07-18 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2ab70576d68 [SPARK-44264][ML][PYTHON] Incorporating FunctionPickler 
Into TorchDistributor
2ab70576d68 is described below

commit 2ab70576d68d07aa69dc1e5a9264e0c9e9f05df0
Author: Mathew Jacob 
AuthorDate: Wed Jul 19 08:29:33 2023 +0800

[SPARK-44264][ML][PYTHON] Incorporating FunctionPickler Into 
TorchDistributor

### What changes were proposed in this pull request?
For the pickling process when running distributed training on functions, I 
migrated the TorchDistributor to leverage the FunctionPickler class instead of 
internal functions. This separates the responsibility better and uses code 
already written and tested.

FunctionPickler PR is [here](https://github.com/apache/spark/pull/41946).

### Why are the changes needed?
Separates the responsibility, making TorchDistributor class deal with more 
focused responsibility.

### Does this PR introduce _any_ user-facing change?
No, this is all internal.

### How was this patch tested?
Existing TorchDistributor tests.

Closes #42045 from mathewjacob1002/integrate_fn_pickler.

Authored-by: Mathew Jacob 
Signed-off-by: Weichen Xu 
---
 python/pyspark/ml/torch/distributor.py | 49 +-
 1 file changed, 7 insertions(+), 42 deletions(-)

diff --git a/python/pyspark/ml/torch/distributor.py 
b/python/pyspark/ml/torch/distributor.py
index 5b5d7c2288a..81c9d03abcf 100644
--- a/python/pyspark/ml/torch/distributor.py
+++ b/python/pyspark/ml/torch/distributor.py
@@ -48,6 +48,7 @@ from pyspark.ml.torch.log_communication import (  # type: 
ignore
 LogStreamingClient,
 LogStreamingServer,
 )
+from pyspark.ml.dl_util import FunctionPickler
 from pyspark.ml.util import _get_active_session
 
 
@@ -746,12 +747,13 @@ class TorchDistributor(Distributor):
 train_fn: Callable, *args: Any, **kwargs: Any
 ) -> Generator[Tuple[str, str], None, None]:
 save_dir = TorchDistributor._create_save_dir()
-pickle_file_path = TorchDistributor._save_pickled_function(
-save_dir, train_fn, *args, **kwargs
+pickle_file_path = FunctionPickler.pickle_fn_and_save(
+train_fn, "", save_dir, *args, **kwargs
 )
 output_file_path = os.path.join(save_dir, 
TorchDistributor._PICKLED_OUTPUT_FILE)
-train_file_path = TorchDistributor._create_torchrun_train_file(
-save_dir, pickle_file_path, output_file_path
+script_path = os.path.join(save_dir, TorchDistributor._TRAIN_FILE)
+train_file_path = FunctionPickler.create_fn_run_script(
+pickle_file_path, output_file_path, script_path
 )
 try:
 yield (train_file_path, output_file_path)
@@ -817,7 +819,7 @@ class TorchDistributor(Distributor):
 "View stdout logs for detailed error message."
 )
 try:
-output = TorchDistributor._get_pickled_output(output_file_path)
+output = FunctionPickler.get_fn_output(output_file_path)
 except Exception as e:
 raise RuntimeError(
 "TorchDistributor failed due to a pickling error. "
@@ -834,43 +836,6 @@ class TorchDistributor(Distributor):
 def _cleanup_files(save_dir: str) -> None:
 shutil.rmtree(save_dir, ignore_errors=True)
 
-@staticmethod
-def _save_pickled_function(
-save_dir: str, train_fn: Union[str, Callable], *args: Any, **kwargs: 
Any
-) -> str:
-saved_pickle_path = os.path.join(save_dir, 
TorchDistributor._PICKLED_FUNC_FILE)
-with open(saved_pickle_path, "wb") as f:
-cloudpickle.dump((train_fn, args, kwargs), f)
-return saved_pickle_path
-
-@staticmethod
-def _create_torchrun_train_file(
-save_dir_path: str, pickle_file_path: str, output_file_path: str
-) -> str:
-code = textwrap.dedent(
-f"""
-from pyspark import cloudpickle
-import os
-
-if __name__ == "__main__":
-with open("{pickle_file_path}", "rb") as f:
-train_fn, args, kwargs = cloudpickle.load(f)
-output = train_fn(*args, **kwargs)
-with open("{output_file_path}", "wb") as f:
-cloudpickle.dump(output, f)
-"""
-)
-saved_file_path = os.path.join(save_dir_path, 
TorchDistributor._TRAIN_FILE)
-with open(saved_file_path, "w") as f:
-f.w

[spark] branch master updated (0d90f2a8ea0 -> 054f94af95f)

2023-07-11 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 0d90f2a8ea0 [SPARK-44264][ML][PYTHON] Write a Deepspeed Distributed 
Learning Class DeepspeedTorchDistributor
 add 054f94af95f [SPARK-44374][PYTHON][ML] Add example code for distributed 
ML for spark connect

No new revisions were added by this update.

Summary of changes:
 python/pyspark/ml/connect/classification.py | 25 +
 python/pyspark/ml/connect/evaluation.py | 38 
 python/pyspark/ml/connect/feature.py| 56 +
 python/pyspark/ml/connect/pipeline.py   | 28 +++
 python/pyspark/ml/connect/tuning.py | 24 -
 5 files changed, 170 insertions(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43983][PYTHON][ML][CONNECT] Implement cross validator estimator

2023-07-10 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 01918bb9017 [SPARK-43983][PYTHON][ML][CONNECT] Implement cross 
validator estimator
01918bb9017 is described below

commit 01918bb90170c13abd6c0f0f5c47f5d9bcc02adc
Author: Weichen Xu 
AuthorDate: Tue Jul 11 07:00:19 2023 +0800

[SPARK-43983][PYTHON][ML][CONNECT] Implement cross validator estimator

### What changes were proposed in this pull request?

Implement cross validator estimator for spark connect.

### Why are the changes needed?

Distributed ML on spark connect project.

### Does this PR introduce _any_ user-facing change?

Yes.

New class `pyspark.ml.connect.tuning.CrossValidator` and 
`pyspark.ml.connect.tuning.CrossValidatorModel` are added.

### How was this patch tested?

Unit tests.

Closes #41881 from WeichenXu123/SPARK-43983-cross-val.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
---
 dev/sparktestsupport/modules.py|   2 +
 python/pyspark/ml/connect/__init__.py  |   2 +
 python/pyspark/ml/connect/base.py  |  18 +-
 python/pyspark/ml/connect/evaluation.py|  83 ++-
 python/pyspark/ml/connect/io_utils.py  |  76 ++-
 python/pyspark/ml/connect/pipeline.py  |  47 +-
 python/pyspark/ml/connect/tuning.py| 566 +
 .../ml/tests/connect/test_connect_tuning.py|  45 ++
 .../tests/connect/test_legacy_mode_evaluation.py   |  31 ++
 .../ml/tests/connect/test_legacy_mode_tuning.py| 267 ++
 10 files changed, 1080 insertions(+), 57 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 439ae40a0f8..2090546512f 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -620,6 +620,7 @@ pyspark_ml = Module(
 "pyspark.ml.tests.connect.test_legacy_mode_feature",
 "pyspark.ml.tests.connect.test_legacy_mode_classification",
 "pyspark.ml.tests.connect.test_legacy_mode_pipeline",
+"pyspark.ml.tests.connect.test_legacy_mode_tuning",
 ],
 excluded_python_implementations=[
 "PyPy"  # Skip these tests under PyPy since they require numpy and it 
isn't available there
@@ -866,6 +867,7 @@ pyspark_connect = Module(
 "pyspark.ml.tests.connect.test_connect_feature",
 "pyspark.ml.tests.connect.test_connect_classification",
 "pyspark.ml.tests.connect.test_connect_pipeline",
+"pyspark.ml.tests.connect.test_connect_tuning",
 ],
 excluded_python_implementations=[
 "PyPy"  # Skip these tests under PyPy since they require numpy, 
pandas, and pyarrow and
diff --git a/python/pyspark/ml/connect/__init__.py 
b/python/pyspark/ml/connect/__init__.py
index 2e048355d74..2ee152f6a38 100644
--- a/python/pyspark/ml/connect/__init__.py
+++ b/python/pyspark/ml/connect/__init__.py
@@ -26,6 +26,7 @@ from pyspark.ml.connect.base import (
 from pyspark.ml.connect import (
 feature,
 evaluation,
+tuning,
 )
 
 from pyspark.ml.connect.pipeline import Pipeline, PipelineModel
@@ -39,4 +40,5 @@ __all__ = [
 "evaluation",
 "Pipeline",
 "PipelineModel",
+"tuning",
 ]
diff --git a/python/pyspark/ml/connect/base.py 
b/python/pyspark/ml/connect/base.py
index f86b1e928c2..f8ce0cb6962 100644
--- a/python/pyspark/ml/connect/base.py
+++ b/python/pyspark/ml/connect/base.py
@@ -146,7 +146,9 @@ class Transformer(Params, metaclass=ABCMeta):
 """
 raise NotImplementedError()
 
-def transform(self, dataset: Union[DataFrame, pd.DataFrame]) -> 
Union[DataFrame, pd.DataFrame]:
+def transform(
+self, dataset: Union[DataFrame, pd.DataFrame], params: 
Optional["ParamMap"] = None
+) -> Union[DataFrame, pd.DataFrame]:
 """
 Transforms the input dataset.
 The dataset can be either pandas dataframe or spark dataframe,
@@ -163,12 +165,24 @@ class Transformer(Params, metaclass=ABCMeta):
 dataset : :py:class:`pyspark.sql.DataFrame` or 
py:class:`pandas.DataFrame`
 input dataset.
 
+params : dict, optional
+an optional param map that overrides embedded params.
+
 Returns
 ---
 :py:class:`pyspark.sql.DataFrame` or py:class:`pandas.DataFrame`
 transformed dataset, the type of output dataframe is consistent 
with
 input dataframe.
 """
+if params is None:
+params = dict()
+if isinstance(params, dict):
+if par

[spark] branch master updated (7bc28d54f83 -> 7fcabef2874)

2023-07-04 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 7bc28d54f83 [SPARK-44269][SQL] Assign names to the error class 
_LEGACY_ERROR_TEMP_[2310-2314]
 add 7fcabef2874 [SPARK-44250][ML][PYTHON][CONNECT] Implement 
classification evaluator

No new revisions were added by this update.

Summary of changes:
 python/pyspark/ml/connect/evaluation.py| 161 -
 .../tests/connect/test_legacy_mode_evaluation.py   |  77 +-
 2 files changed, 202 insertions(+), 36 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (0865c0db923 -> 35b3a18ff04)

2023-06-20 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 0865c0db923 [SPARK-43944][SPARK-43942][SQL][FOLLOWUP] Directly 
leverage `UnresolvedFunction` for functions `startswith`/`endswith`/`contains`
 add 35b3a18ff04 [SPARK-44100][ML][CONNECT][PYTHON] Move namespace from 
`pyspark.mlv2` to `pyspark.ml.connect`

No new revisions were added by this update.

Summary of changes:
 dev/sparktestsupport/modules.py| 18 +-
 python/mypy.ini|  3 --
 python/pyspark/ml/connect/__init__.py  | 24 +
 python/pyspark/{mlv2 => ml/connect}/base.py|  2 +-
 .../pyspark/{mlv2 => ml/connect}/classification.py |  6 ++--
 python/pyspark/{mlv2 => ml/connect}/evaluation.py  |  6 ++--
 python/pyspark/{mlv2 => ml/connect}/feature.py |  6 ++--
 python/pyspark/{mlv2 => ml/connect}/io_utils.py|  0
 python/pyspark/{mlv2 => ml/connect}/pipeline.py|  8 ++---
 python/pyspark/{mlv2 => ml/connect}/summarizer.py  |  2 +-
 python/pyspark/{mlv2 => ml/connect}/util.py|  0
 .../tests/connect/test_connect_classification.py}  |  6 ++--
 .../tests/connect/test_connect_evaluation.py}  |  4 +--
 .../tests/connect/test_connect_feature.py} |  4 +--
 .../tests/connect/test_connect_pipeline.py}|  6 ++--
 .../tests/connect/test_connect_summarizer.py}  |  4 +--
 .../connect/test_legacy_mode_classification.py}|  6 ++--
 .../tests/connect/test_legacy_mode_evaluation.py}  |  4 +--
 .../tests/connect/test_legacy_mode_feature.py} |  4 +--
 .../tests/connect/test_legacy_mode_pipeline.py}| 10 +++---
 .../tests/connect/test_legacy_mode_summarizer.py}  |  4 +--
 python/pyspark/mlv2/__init__.py| 40 --
 22 files changed, 75 insertions(+), 92 deletions(-)
 rename python/pyspark/{mlv2 => ml/connect}/base.py (99%)
 rename python/pyspark/{mlv2 => ml/connect}/classification.py (98%)
 rename python/pyspark/{mlv2 => ml/connect}/evaluation.py (95%)
 rename python/pyspark/{mlv2 => ml/connect}/feature.py (97%)
 rename python/pyspark/{mlv2 => ml/connect}/io_utils.py (100%)
 rename python/pyspark/{mlv2 => ml/connect}/pipeline.py (96%)
 rename python/pyspark/{mlv2 => ml/connect}/summarizer.py (98%)
 rename python/pyspark/{mlv2 => ml/connect}/util.py (100%)
 copy python/pyspark/{mlv2/tests/connect/test_parity_classification.py => 
ml/tests/connect/test_connect_classification.py} (84%)
 rename python/pyspark/{mlv2/tests/connect/test_parity_evaluation.py => 
ml/tests/connect/test_connect_evaluation.py} (89%)
 rename python/pyspark/{mlv2/tests/connect/test_parity_feature.py => 
ml/tests/connect/test_connect_feature.py} (89%)
 rename python/pyspark/{mlv2/tests/connect/test_parity_classification.py => 
ml/tests/connect/test_connect_pipeline.py} (85%)
 rename python/pyspark/{mlv2/tests/connect/test_parity_summarizer.py => 
ml/tests/connect/test_connect_summarizer.py} (88%)
 rename python/pyspark/{mlv2/tests/test_classification.py => 
ml/tests/connect/test_legacy_mode_classification.py} (98%)
 rename python/pyspark/{mlv2/tests/test_evaluation.py => 
ml/tests/connect/test_legacy_mode_evaluation.py} (94%)
 rename python/pyspark/{mlv2/tests/test_feature.py => 
ml/tests/connect/test_legacy_mode_feature.py} (97%)
 rename python/pyspark/{mlv2/tests/test_pipeline.py => 
ml/tests/connect/test_legacy_mode_pipeline.py} (95%)
 rename python/pyspark/{mlv2/tests/test_summarizer.py => 
ml/tests/connect/test_legacy_mode_summarizer.py} (94%)
 delete mode 100644 python/pyspark/mlv2/__init__.py


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43982][ML][PYTHON][CONNECT] Implement pipeline estimator for ML on spark connect

2023-06-19 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6c0c226d901 [SPARK-43982][ML][PYTHON][CONNECT] Implement pipeline 
estimator for ML on spark connect
6c0c226d901 is described below

commit 6c0c226d90192e54a4965b6d69905936619e20d6
Author: Weichen Xu 
AuthorDate: Mon Jun 19 21:36:21 2023 +0800

[SPARK-43982][ML][PYTHON][CONNECT] Implement pipeline estimator for ML on 
spark connect

### What changes were proposed in this pull request?

Implement pipeline estimator for ML on spark connect

### Why are the changes needed?

See Distributed ML <> spark connect project design doc:

https://docs.google.com/document/d/1LHzwCjm2SluHkta_08cM3jxFSgfF-niaCZbtIThG-H8/edit#heading=h.x8uc4xogrzbk

### Does this PR introduce _any_ user-facing change?

Yes. New estimator `pyspark.mlv2.pipeline.Pipeline` is added.

### How was this patch tested?

Unit tests.

Closes #41479 from WeichenXu123/mlv2-pipeline.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
---
 python/pyspark/mlv2/__init__.py|   4 +
 python/pyspark/mlv2/classification.py  |   6 +-
 python/pyspark/mlv2/feature.py |   6 +-
 python/pyspark/mlv2/io_utils.py| 187 ++
 python/pyspark/mlv2/pipeline.py| 241 +
 python/pyspark/mlv2/tests/test_pipeline.py | 184 ++
 6 files changed, 561 insertions(+), 67 deletions(-)

diff --git a/python/pyspark/mlv2/__init__.py b/python/pyspark/mlv2/__init__.py
index 990b4fa9da8..352d24baabe 100644
--- a/python/pyspark/mlv2/__init__.py
+++ b/python/pyspark/mlv2/__init__.py
@@ -26,6 +26,8 @@ from pyspark.mlv2 import (
 evaluation,
 )
 
+from pyspark.mlv2.pipeline import Pipeline, PipelineModel
+
 __all__ = [
 "Estimator",
 "Transformer",
@@ -33,4 +35,6 @@ __all__ = [
 "Model",
 "feature",
 "evaluation",
+"Pipeline",
+"PipelineModel",
 ]
diff --git a/python/pyspark/mlv2/classification.py 
b/python/pyspark/mlv2/classification.py
index fe0d76837f9..522c54b5289 100644
--- a/python/pyspark/mlv2/classification.py
+++ b/python/pyspark/mlv2/classification.py
@@ -40,7 +40,7 @@ from pyspark.ml.param.shared import (
 HasMomentum,
 )
 from pyspark.mlv2.base import Predictor, PredictionModel
-from pyspark.mlv2.io_utils import ParamsReadWrite, ModelReadWrite
+from pyspark.mlv2.io_utils import ParamsReadWrite, CoreModelReadWrite
 from pyspark.sql.functions import lit, count, countDistinct
 
 import torch
@@ -253,7 +253,9 @@ class LogisticRegression(
 
 
 @inherit_doc
-class LogisticRegressionModel(PredictionModel, _LogisticRegressionParams, 
ModelReadWrite):
+class LogisticRegressionModel(
+PredictionModel, _LogisticRegressionParams, ParamsReadWrite, 
CoreModelReadWrite
+):
 """
 Model fitted by LogisticRegression.
 
diff --git a/python/pyspark/mlv2/feature.py b/python/pyspark/mlv2/feature.py
index 57c6213d2bb..a58f214711c 100644
--- a/python/pyspark/mlv2/feature.py
+++ b/python/pyspark/mlv2/feature.py
@@ -24,7 +24,7 @@ from pyspark import keyword_only
 from pyspark.sql import DataFrame
 from pyspark.ml.param.shared import HasInputCol, HasOutputCol
 from pyspark.mlv2.base import Estimator, Model
-from pyspark.mlv2.io_utils import ParamsReadWrite, ModelReadWrite
+from pyspark.mlv2.io_utils import ParamsReadWrite, CoreModelReadWrite
 from pyspark.mlv2.summarizer import summarize_dataframe
 
 
@@ -61,7 +61,7 @@ class MaxAbsScaler(Estimator, HasInputCol, HasOutputCol, 
ParamsReadWrite):
 return self._copyValues(model)
 
 
-class MaxAbsScalerModel(Model, HasInputCol, HasOutputCol, ModelReadWrite):
+class MaxAbsScalerModel(Model, HasInputCol, HasOutputCol, ParamsReadWrite, 
CoreModelReadWrite):
 def __init__(
 self, max_abs_values: Optional["np.ndarray"] = None, n_samples_seen: 
Optional[int] = None
 ) -> None:
@@ -143,7 +143,7 @@ class StandardScaler(Estimator, HasInputCol, HasOutputCol, 
ParamsReadWrite):
 return self._copyValues(model)
 
 
-class StandardScalerModel(Model, HasInputCol, HasOutputCol, ModelReadWrite):
+class StandardScalerModel(Model, HasInputCol, HasOutputCol, ParamsReadWrite, 
CoreModelReadWrite):
 def __init__(
 self,
 mean_values: Optional["np.ndarray"] = None,
diff --git a/python/pyspark/mlv2/io_utils.py b/python/pyspark/mlv2/io_utils.py
index 8f7263206a7..c701736712f 100644
--- a/python/pyspark/mlv2/io_utils.py
+++ b/python/pyspark/mlv2/io_utils.py
@@ -21,7 +21,8 @@ import os
 import tempfile
 import time
 from urllib.parse import urlparse
-from typing import Any, Dict, Optional
+from typing import Any, Dict,

[spark] branch master updated: [SPARK-43097][FOLLOW-UP][ML] Improve logistic regression model saving

2023-06-17 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4e0bd3c5717 [SPARK-43097][FOLLOW-UP][ML] Improve logistic regression 
model saving
4e0bd3c5717 is described below

commit 4e0bd3c571758f441d56d0341d6c4f506fdb1565
Author: Weichen Xu 
AuthorDate: Sat Jun 17 22:30:39 2023 +0800

[SPARK-43097][FOLLOW-UP][ML] Improve logistic regression model saving

### What changes were proposed in this pull request?

Improve logistic regression model saving:

Current master code, it saves the core pytorch model that only includes the 
"Linear" layer, to make the saved pytorch model easier to use solely without 
pyspark, I append a "softmax" layer to the torch model and then save it.

### Why are the changes needed?

Improving the saved pytorch model in `LogisticRegressionModel.save`

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

    Closes #41629 from WeichenXu123/improve-lor-model-save.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
---
 python/pyspark/mlv2/classification.py|  9 +++--
 python/pyspark/mlv2/tests/test_classification.py | 19 +++
 2 files changed, 26 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/mlv2/classification.py 
b/python/pyspark/mlv2/classification.py
index 0fcded0d769..fe0d76837f9 100644
--- a/python/pyspark/mlv2/classification.py
+++ b/python/pyspark/mlv2/classification.py
@@ -331,10 +331,15 @@ class LogisticRegressionModel(PredictionModel, 
_LogisticRegressionParams, ModelR
 return self.__class__.__name__ + ".torch"
 
 def _save_core_model(self, path: str) -> None:
-torch.save(self.torch_model, path)
+lor_torch_model = torch_nn.Sequential(
+self.torch_model,
+torch_nn.Softmax(dim=1),
+)
+torch.save(lor_torch_model, path)
 
 def _load_core_model(self, path: str) -> None:
-self.torch_model = torch.load(path)
+lor_torch_model = torch.load(path)
+self.torch_model = lor_torch_model[0]
 
 def _get_extra_metadata(self) -> Dict[str, Any]:
 return {
diff --git a/python/pyspark/mlv2/tests/test_classification.py 
b/python/pyspark/mlv2/tests/test_classification.py
index 159862ef5f6..7f7d43b9cc8 100644
--- a/python/pyspark/mlv2/tests/test_classification.py
+++ b/python/pyspark/mlv2/tests/test_classification.py
@@ -167,10 +167,29 @@ class ClassificationTestsMixin:
 )
 
 model = estimator.fit(training_dataset)
+model_predictions = model.transform(eval_df1.toPandas())
+
 assert model.uid == estimator.uid
 
 local_model_path = os.path.join(tmp_dir, "model")
 model.saveToLocal(local_model_path)
+
+# test saved torch model can be loaded by pytorch solely
+lor_torch_model = torch.load(
+os.path.join(local_model_path, "LogisticRegressionModel.torch")
+)
+
+with torch.inference_mode():
+torch_infer_result = lor_torch_model(
+torch.tensor(np.stack(list(eval_df1.toPandas().features)), 
dtype=torch.float32)
+).numpy()
+
+np.testing.assert_allclose(
+np.stack(list(model_predictions.probability)),
+torch_infer_result,
+rtol=1e-4,
+)
+
 loaded_model = LORV2Model.loadFromLocal(local_model_path)
 assert loaded_model.numFeatures == 2
 assert loaded_model.numClasses == 2


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43981][PYTHON][ML] Basic saving / loading implementation for ML on spark connect

2023-06-13 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a5d3bea04eb [SPARK-43981][PYTHON][ML] Basic saving / loading 
implementation for ML on spark connect
a5d3bea04eb is described below

commit a5d3bea04eb8430fd747905633b8164e21c95190
Author: Weichen Xu 
AuthorDate: Wed Jun 14 12:42:22 2023 +0800

[SPARK-43981][PYTHON][ML] Basic saving / loading implementation for ML on 
spark connect

### What changes were proposed in this pull request?

* Base class / helper functions for saving/loading estimator / transformer 
/ evaluator / model.
* Add saving/loading implementation for feature transformers.
* Add saving/loading implementation for logistic regression estimator.

Design goals:

* The model format is decoupled from spark, i.e. we can run model inference 
without spark service.
* We can save model to either local file system or cloud storage file 
system.

### Why are the changes needed?

We need to support saving/loading estimator / transformer / evaluator / 
model.

### Does this PR introduce _any_ user-facing change?

Yes.

### How was this patch tested?

Unit tests.

Closes #41478 from WeichenXu123/mlv2-read-write.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
---
 .../scala/org/apache/spark/ml/python/MLUtil.scala  |  43 
 python/pyspark/ml/torch/distributor.py |  14 +-
 python/pyspark/ml/util.py  |  13 ++
 python/pyspark/mlv2/classification.py  | 118 ++
 python/pyspark/mlv2/evaluation.py  |   3 +-
 python/pyspark/mlv2/feature.py | 133 ---
 python/pyspark/mlv2/io_utils.py| 242 +
 python/pyspark/mlv2/summarizer.py  |   4 +-
 .../tests/connect/test_parity_classification.py|   6 +-
 python/pyspark/mlv2/tests/test_classification.py   |  84 ++-
 python/pyspark/mlv2/tests/test_feature.py  |  58 -
 11 files changed, 634 insertions(+), 84 deletions(-)

diff --git a/mllib/src/main/scala/org/apache/spark/ml/python/MLUtil.scala 
b/mllib/src/main/scala/org/apache/spark/ml/python/MLUtil.scala
new file mode 100644
index 000..5e2b8943ed8
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/python/MLUtil.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.python
+
+import java.nio.file.Paths
+
+import org.apache.hadoop.fs.{Path => FSPath}
+
+import org.apache.spark.sql.SparkSession
+
+
+object MLUtil {
+
+  def copyFileFromLocalToFs(localPath: String, destPath: String): Unit = {
+val sparkContext = SparkSession.getActiveSession.get.sparkContext
+
+val hadoopConf = sparkContext.hadoopConfiguration
+assert(
+  Paths.get(destPath).isAbsolute,
+  "Destination path must be an absolute path on cloud storage."
+)
+val destFSPath = new FSPath(destPath)
+val fs = destFSPath.getFileSystem(hadoopConf)
+
+fs.copyFromLocalFile(false, true, new FSPath(localPath.toString), 
destFSPath)
+  }
+
+}
diff --git a/python/pyspark/ml/torch/distributor.py 
b/python/pyspark/ml/torch/distributor.py
index be49dc147c0..2ed70854cc6 100644
--- a/python/pyspark/ml/torch/distributor.py
+++ b/python/pyspark/ml/torch/distributor.py
@@ -48,19 +48,7 @@ from pyspark.ml.torch.log_communication import (  # type: 
ignore
 LogStreamingClient,
 LogStreamingServer,
 )
-
-
-def _get_active_session(is_remote: bool) -> SparkSession:
-if not is_remote:
-spark = SparkSession.getActiveSession()
-else:
-import pyspark.sql.connect.session
-
-spark = pyspark.sql.connect.session._active_spark_session  # type: 
ignore[assignment]
-
-if spark is None:
-raise RuntimeError("An active SparkSession is required for the 
distributor.")
-return spark
+from pyspark.ml.util import _get_active_session
 
 
 def _ge

[spark] branch master updated (fead8a7962a -> 89de4f79e7f)

2023-06-07 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from fead8a7962a [SPARK-43993][SQL][TESTS] Add tests for cache artifacts
 add 89de4f79e7f [SPARK-43790][PYTHON][CONNECT][ML] Add `copyFromLocalToFs` 
API

No new revisions were added by this update.

Summary of changes:
 .../artifact/SparkConnectArtifactManager.scala | 39 ++
 .../apache/spark/sql/connect/config/Connect.scala  | 15 +
 .../service/SparkConnectAddArtifactsHandler.scala  |  7 +++-
 .../connect/artifact/ArtifactManagerSuite.scala| 18 +-
 python/pyspark/sql/connect/client/artifact.py  | 33 +++---
 python/pyspark/sql/connect/client/core.py  |  3 ++
 python/pyspark/sql/connect/session.py  | 29 
 .../sql/tests/connect/client/test_artifact.py  | 21 
 8 files changed, 158 insertions(+), 7 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43097][ML] New pyspark ML logistic regression estimator implemented on top of distributor

2023-06-06 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2a82b42bcb1 [SPARK-43097][ML] New pyspark ML logistic regression 
estimator implemented on top of distributor
2a82b42bcb1 is described below

commit 2a82b42bcb100fade622d3d08ef5d316425d3e5a
Author: Weichen Xu 
AuthorDate: Tue Jun 6 21:53:46 2023 +0800

[SPARK-43097][ML] New pyspark ML logistic regression estimator implemented 
on top of distributor

### What changes were proposed in this pull request?

This PR takes over https://github.com/apache/spark/pull/40748

Closes https://github.com/apache/spark/pull/40748

### Why are the changes needed?

Distributed ML on spark connect project.

### Does this PR introduce _any_ user-facing change?

Yes.

### How was this patch tested?

Unit tests.

Manually testing code:

Start local pyspark shell by:

```
bin/pyspark --remote "local[2]"# spark connect mode
```
or
```
bin/pyspark --master "local[2]"   # legacy mode
```
to launch pyspark shell,

Then, paste following code to pyspark shell:
```
from pyspark.mlv2.classification import LogisticRegression as LORV2
lorv2 = LORV2(maxIter=2, numTrainWorkers=2)

from pyspark.ml.linalg import Vectors

df = spark.createDataFrame(
[
(1.0, Vectors.dense(0.0, 5.0)),
(0.0, Vectors.dense(1.0, 2.0)),
(1.0, Vectors.dense(2.0, 1.0)),
(0.0, Vectors.dense(3.0, 3.0)),
] * 100,
["label", "features"],
)

model.transform(df).show(truncate=False)
model.transform(df.toPandas())

model.set(model.probabilityCol, "")
model.transform(df).show(truncate=False)
model.transform(df.toPandas())
```

Closes #41383 from WeichenXu123/lor-torch-2.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
---
 dev/sparktestsupport/modules.py|   2 +
 python/pyspark/ml/param/_shared_params_code_gen.py |  24 ++
 python/pyspark/ml/param/shared.py  |  89 ++
 python/pyspark/ml/torch/data.py|  13 +-
 python/pyspark/ml/torch/distributor.py |  13 +-
 python/pyspark/mlv2/base.py|  75 -
 python/pyspark/mlv2/classification.py  | 306 +
 .../tests/connect/test_parity_classification.py|  41 +++
 python/pyspark/mlv2/tests/test_classification.py   | 139 ++
 9 files changed, 696 insertions(+), 6 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index aa755033aa4..ecc471fd700 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -609,6 +609,7 @@ pyspark_ml = Module(
 "pyspark.mlv2.tests.test_summarizer",
 "pyspark.mlv2.tests.test_evaluation",
 "pyspark.mlv2.tests.test_feature",
+"pyspark.mlv2.tests.test_classification",
 ],
 excluded_python_implementations=[
 "PyPy"  # Skip these tests under PyPy since they require numpy and it 
isn't available there
@@ -823,6 +824,7 @@ pyspark_connect = Module(
 "pyspark.mlv2.tests.connect.test_parity_summarizer",
 "pyspark.mlv2.tests.connect.test_parity_evaluation",
 "pyspark.mlv2.tests.connect.test_parity_feature",
+"pyspark.mlv2.tests.connect.test_parity_classification",
 ],
 excluded_python_implementations=[
 "PyPy"  # Skip these tests under PyPy since they require numpy, 
pandas, and pyarrow and
diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py 
b/python/pyspark/ml/param/_shared_params_code_gen.py
index 5df1782084a..2bec3a5053f 100644
--- a/python/pyspark/ml/param/_shared_params_code_gen.py
+++ b/python/pyspark/ml/param/_shared_params_code_gen.py
@@ -332,6 +332,30 @@ if __name__ == "__main__":
 "0.0",
 "TypeConverters.toFloat",
 ),
+(
+"numTrainWorkers",
+"number of training workers",
+"1",
+"TypeConverters.toInt",
+),
+(
+"batchSize",
+"number of training batch size",
+None,
+"TypeConverters.toInt",
+),
+(
+"learningRate",
+"learning rate for training",
+None,
+"TypeConverters.toFloat",
+),
+(
+"momentum",
+"momentum for training op

[spark] branch master updated (51a919ea8d6 -> 1df1d7661a3)

2023-06-06 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 51a919ea8d6 [SPARK-43973][SS][UI] Structured Streaming UI should 
display failed queries correctly
 add 1df1d7661a3 [SPARK-43516][ML][PYTHON] Update MLv2 Transformer 
interfaces

No new revisions were added by this update.

Summary of changes:
 python/pyspark/mlv2/base.py   | 24 -
 python/pyspark/mlv2/feature.py| 16 +++---
 python/pyspark/mlv2/tests/test_feature.py |  6 ++
 python/pyspark/mlv2/util.py   | 35 ---
 4 files changed, 51 insertions(+), 30 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (c618dabc96a -> fc3489d8bb6)

2023-06-05 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from c618dabc96a Revert "[SPARK-43911][SQL] Use toSet to deduplicate the 
iterator data to prevent the creation of large Array"
 add fc3489d8bb6 [SPARK-43783][SPARK-43784][SPARK-43788][ML] Make MLv2 (ML 
on spark connect) supports pandas >= 2.0

No new revisions were added by this update.

Summary of changes:
 python/pyspark/mlv2/summarizer.py|  2 +-
 python/pyspark/mlv2/tests/test_feature.py| 10 --
 python/pyspark/mlv2/tests/test_summarizer.py |  6 --
 3 files changed, 1 insertion(+), 17 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43516][ML][FOLLOW-UP] Drop vector type support in Distributed ML for spark connect

2023-06-02 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c3b62708cd6 [SPARK-43516][ML][FOLLOW-UP] Drop vector type support in 
Distributed ML for spark connect
c3b62708cd6 is described below

commit c3b62708cd6371da08943e572a9bc0a45c1dcced
Author: Weichen Xu 
AuthorDate: Fri Jun 2 20:28:01 2023 +0800

[SPARK-43516][ML][FOLLOW-UP] Drop vector type support in Distributed ML for 
spark connect

### What changes were proposed in this pull request?

Drop vector type support in Distributed ML for spark connect.

### Why are the changes needed?

Distributed ML is designed for supporting fitting / transforming over 
either spark dataframe or local pandas dataframe.
Currently pandas dataframe does not have vector type similar to 
`spark.ml.linalg.Vector`, and Vector type does not have too much advantages 
except saving sparse features dataset.

To make the interface consistent, we decided initial version does not 
support vector type.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

UT.

Closes #41420 from WeichenXu123/mlv2-drop-vector-type-support.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
---
 python/pyspark/mlv2/base.py  |  6 +-
 python/pyspark/mlv2/tests/test_feature.py| 16 +++-
 python/pyspark/mlv2/tests/test_summarizer.py | 10 --
 python/pyspark/mlv2/util.py  | 19 ---
 4 files changed, 16 insertions(+), 35 deletions(-)

diff --git a/python/pyspark/mlv2/base.py b/python/pyspark/mlv2/base.py
index dc503db71c0..63631eccf2f 100644
--- a/python/pyspark/mlv2/base.py
+++ b/python/pyspark/mlv2/base.py
@@ -134,7 +134,11 @@ class Transformer(Params, metaclass=ABCMeta):
 def _get_transform_fn(self) -> Callable[["pd.Series"], Any]:
 """
 Return a transformation function that accepts an instance of 
`pd.Series` as input and
-returns transformed result as an instance of `pd.Series` or 
`pd.DataFrame`
+returns transformed result as an instance of `pd.Series` or 
`pd.DataFrame`.
+If there's only one output column, the transformed result must be an
+instance of `pd.Series`, if there are multiple output columns, the 
transformed result
+must be an instance of `pd.DataFrame` with column names matching 
output schema
+returned by  `_output_columns` interface.
 """
 raise NotImplementedError()
 
diff --git a/python/pyspark/mlv2/tests/test_feature.py 
b/python/pyspark/mlv2/tests/test_feature.py
index 8bc9d4c2307..eed04217a6f 100644
--- a/python/pyspark/mlv2/tests/test_feature.py
+++ b/python/pyspark/mlv2/tests/test_feature.py
@@ -21,8 +21,6 @@ from distutils.version import LooseVersion
 import numpy as np
 import pandas as pd
 
-from pyspark.ml.functions import vector_to_array
-from pyspark.ml.linalg import Vectors
 from pyspark.mlv2.feature import MaxAbsScaler, StandardScaler
 from pyspark.sql import SparkSession
 
@@ -35,8 +33,8 @@ class FeatureTestsMixin:
 def test_max_abs_scaler(self):
 df1 = self.spark.createDataFrame(
 [
-(Vectors.dense([2.0, 3.5, 1.5]),),
-(Vectors.dense([-3.0, -0.5, -2.5]),),
+([2.0, 3.5, 1.5],),
+([-3.0, -0.5, -2.5],),
 ],
 schema=["features"],
 )
@@ -49,7 +47,7 @@ class FeatureTestsMixin:
 
 np.testing.assert_allclose(list(result.scaled_features), 
expected_result)
 
-local_df1 = df1.withColumn("features", 
vector_to_array("features")).toPandas()
+local_df1 = df1.toPandas()
 local_fit_model = scaler.fit(local_df1)
 local_transform_result = local_fit_model.transform(local_df1)
 
@@ -62,9 +60,9 @@ class FeatureTestsMixin:
 def test_standard_scaler(self):
 df1 = self.spark.createDataFrame(
 [
-(Vectors.dense([2.0, 3.5, 1.5]),),
-(Vectors.dense([-3.0, -0.5, -2.5]),),
-(Vectors.dense([1.0, -1.5, 0.5]),),
+([2.0, 3.5, 1.5],),
+([-3.0, -0.5, -2.5],),
+([1.0, -1.5, 0.5],),
 ],
 schema=["features"],
 )
@@ -81,7 +79,7 @@ class FeatureTestsMixin:
 
 np.testing.assert_allclose(list(result.scaled_features), 
expected_result)
 
-local_df1 = df1.withColumn("features", 
vector_to_array("features")).toPandas()
+local_df1 = df1.toPandas()
 local_fit_model = scaler.fit(local_df1)
 local_transform_result = local_fit_model.transform(local_df1)
 
diff --git a/python/

[spark] branch master updated: [SPARK-41593][FOLLOW-UP][ML] Torch distributor log streaming server: Avoid duplicated log to stdout redirection

2023-06-01 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 480b14f4b45 [SPARK-41593][FOLLOW-UP][ML] Torch distributor log 
streaming server: Avoid duplicated log to stdout redirection
480b14f4b45 is described below

commit 480b14f4b45a4e01e7e4bdea82475d4c77a6b89f
Author: Weichen Xu 
AuthorDate: Thu Jun 1 16:26:49 2023 +0800

[SPARK-41593][FOLLOW-UP][ML] Torch distributor log streaming server: Avoid 
duplicated log to stdout redirection

### What changes were proposed in this pull request?

Torch distributor log streaming server: Avoid duplicated log to stdout 
redirection.

In some cases (typically spark local mode), the remote tasks runs on the 
same node with spark driver,
in this case, both torch process created by spark task and driver side 
torch distributor log streaming server redirect logs to STDOUT, then it causes 
STDOUT prints duplicate logs. This PR fixes the case.

### Why are the changes needed?

In some cases (typically spark local mode), the remote tasks runs on the 
same node with spark driver,
in this case, both torch process created by spark task and driver side 
torch distributor log streaming server redirect logs to STDOUT, then it causes 
STDOUT prints duplicate logs. This PR fixes the case.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

UT.

Closes #41404 from WeichenXu123/torch-distributor-avoid-dup-log.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
---
 python/pyspark/ml/torch/distributor.py | 17 -
 1 file changed, 16 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/ml/torch/distributor.py 
b/python/pyspark/ml/torch/distributor.py
index 711f76db09b..2af92f8399f 100644
--- a/python/pyspark/ml/torch/distributor.py
+++ b/python/pyspark/ml/torch/distributor.py
@@ -462,7 +462,22 @@ class TorchDistributor(Distributor):
 decoded = line.decode()
 tail.append(decoded)
 if redirect_to_stdout:
-sys.stdout.write(decoded)
+if (
+log_streaming_client
+and not log_streaming_client.failed
+and (
+log_streaming_client.sock.getsockname()[0]
+== log_streaming_client.sock.getpeername()[0]
+)
+):
+# If log_streaming_client and log_stream_server are in 
the same
+# node (typical case is spark local mode),
+# server side will redirect the log to STDOUT,
+# to avoid STDOUT outputs duplication, skip redirecting
+# logs to STDOUT in client side.
+pass
+else:
+sys.stdout.write(decoded)
 if log_streaming_client:
 log_streaming_client.send(decoded.rstrip())
 task.wait()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43081][ML][FOLLOW-UP] Improve torch distributor data loader code

2023-05-31 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c2060e7c0a3 [SPARK-43081][ML][FOLLOW-UP] Improve torch distributor 
data loader code
c2060e7c0a3 is described below

commit c2060e7c0a332c20f527adeb34a52042237430e4
Author: Weichen Xu 
AuthorDate: Wed May 31 16:34:19 2023 +0800

[SPARK-43081][ML][FOLLOW-UP] Improve torch distributor data loader code

### What changes were proposed in this pull request?

### Why are the changes needed?

Improve torch distributor data loader code:

* Add a verification that num_processes must match input spark dataframe 
partitions. This makes user debug easier when they set mismatched input 
dataframe, otherwise torch package will raise intricate error information.
* Improve column value conversion in torch dataloader. Avoid comparing type 
operation for every column values.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

UT.

Closes #41382 from WeichenXu123/improve-torch-dataloader.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
---
 python/pyspark/ml/torch/data.py| 50 --
 python/pyspark/ml/torch/distributor.py |  6 
 2 files changed, 36 insertions(+), 20 deletions(-)

diff --git a/python/pyspark/ml/torch/data.py b/python/pyspark/ml/torch/data.py
index a52b96a9392..d421683c16d 100644
--- a/python/pyspark/ml/torch/data.py
+++ b/python/pyspark/ml/torch/data.py
@@ -17,7 +17,7 @@
 
 import torch
 import numpy as np
-from typing import Any, Iterator
+from typing import Any, Callable, Iterator
 from pyspark.sql.types import StructType
 
 
@@ -26,27 +26,37 @@ class 
_SparkPartitionTorchDataset(torch.utils.data.IterableDataset):
 self.arrow_file_path = arrow_file_path
 self.num_samples = num_samples
 self.field_types = [field.dataType.simpleString() for field in schema]
+self.field_converters = [
+_SparkPartitionTorchDataset._get_field_converter(field_type)
+for field_type in self.field_types
+]
 
 @staticmethod
-def _extract_field_value(value: Any, field_type: str) -> Any:
-# TODO: avoid checking field type for every row.
+def _get_field_converter(field_type: str) -> Callable[[Any], Any]:
 if field_type == "vector":
-if value["type"] == 1:
-# dense vector
-return value["values"]
-if value["type"] == 0:
-# sparse vector
-size = int(value["size"])
-sparse_array = np.zeros(size, dtype=np.float64)
-sparse_array[value["indices"]] = value["values"]
-return sparse_array
-if field_type in ["float", "double", "int", "bigint", "smallint"]:
-return value
 
-raise ValueError(
-"SparkPartitionTorchDataset does not support loading data from 
field of "
-f"type {field_type}."
-)
+def converter(value: Any) -> Any:
+if value["type"] == 1:
+# dense vector
+return value["values"]
+if value["type"] == 0:
+# sparse vector
+size = int(value["size"])
+sparse_array = np.zeros(size, dtype=np.float64)
+sparse_array[value["indices"]] = value["values"]
+return sparse_array
+
+elif field_type in ["float", "double", "int", "bigint", "smallint"]:
+
+def converter(value: Any) -> Any:
+return value
+
+else:
+raise ValueError(
+"SparkPartitionTorchDataset does not support loading data from 
field of "
+f"type {field_type}."
+)
+return converter
 
 def __iter__(self) -> Iterator[Any]:
 from pyspark.sql.pandas.serializers import ArrowStreamSerializer
@@ -71,8 +81,8 @@ class 
_SparkPartitionTorchDataset(torch.utils.data.IterableDataset):
 batch_pdf = batch.to_pandas()
 for row in batch_pdf.itertuples(index=False):
 yield [
-
_SparkPartitionTorchDataset._extract_field_value(value, field_type)
-for value, field_type in zip(row, self.field_types)
+field_converter(value)
+for value, field_converter 

[spark] branch master updated: [SPARK-41593][FOLLOW-UP] Fix the case torch distributor logging server not shut down

2023-05-30 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e1619653895 [SPARK-41593][FOLLOW-UP] Fix the case torch distributor 
logging server not shut down
e1619653895 is described below

commit e1619653895b4d5e11d7121bdb7906355d8c17bf
Author: Weichen Xu 
AuthorDate: Tue May 30 19:13:20 2023 +0800

[SPARK-41593][FOLLOW-UP] Fix the case torch distributor logging server not 
shut down

### What changes were proposed in this pull request?

Fix the case torch distributor logging server not shut down.

The `_get_spark_task_function` and `_check_encryption` might raise 
exception, in this case, the logging server must be shut down but it is not 
shut down. This PR fixes the case.

### Why are the changes needed?

Fix the case torch distributor logging server not shut down

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit tests.

Closes #41375 from 
WeichenXu123/improve-torch-distributor-log-server-exception-handling.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
---
 python/pyspark/ml/torch/distributor.py | 26 +-
 1 file changed, 13 insertions(+), 13 deletions(-)

diff --git a/python/pyspark/ml/torch/distributor.py 
b/python/pyspark/ml/torch/distributor.py
index ad8b4d8cc25..0249e6b4b2c 100644
--- a/python/pyspark/ml/torch/distributor.py
+++ b/python/pyspark/ml/torch/distributor.py
@@ -665,20 +665,20 @@ class TorchDistributor(Distributor):
 time.sleep(1)  # wait for the server to start
 self.log_streaming_server_port = log_streaming_server.port
 
-spark_task_function = self._get_spark_task_function(
-framework_wrapper_fn, train_object, spark_dataframe, *args, 
**kwargs
-)
-self._check_encryption()
-self.logger.info(
-f"Started distributed training with {self.num_processes} executor 
processes"
-)
-if spark_dataframe is not None:
-input_df = spark_dataframe
-else:
-input_df = self.spark.range(
-start=0, end=self.num_tasks, step=1, 
numPartitions=self.num_tasks
-)
 try:
+spark_task_function = self._get_spark_task_function(
+framework_wrapper_fn, train_object, spark_dataframe, *args, 
**kwargs
+)
+self._check_encryption()
+self.logger.info(
+f"Started distributed training with {self.num_processes} 
executor processes"
+)
+if spark_dataframe is not None:
+input_df = spark_dataframe
+else:
+input_df = self.spark.range(
+start=0, end=self.num_tasks, step=1, 
numPartitions=self.num_tasks
+)
 rows = input_df.mapInArrow(
 func=spark_task_function, schema="chunk binary", barrier=True
 ).collect()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (7c7b9585a2a -> 0e8e4ae47fb)

2023-05-24 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 7c7b9585a2a [SPARK-43546][PYTHON][CONNECT][TESTS] Complete parity 
tests of Pandas UDF
 add 0e8e4ae47fb [SPARK-43516][ML][PYTHON][CONNECT] Base interfaces of 
sparkML for spark3.5: estimator/transformer/model/evaluator

No new revisions were added by this update.

Summary of changes:
 dev/infra/Dockerfile   |   2 +-
 dev/requirements.txt   |   2 +
 dev/sparktestsupport/modules.py|   6 +
 python/mypy.ini|   3 +
 python/pyspark/mlv2/__init__.py|  36 
 python/pyspark/mlv2/base.py| 240 +
 python/pyspark/mlv2/evaluation.py  |  88 
 python/pyspark/mlv2/feature.py | 124 +++
 python/pyspark/mlv2/summarizer.py  | 118 ++
 .../mlv2/tests/connect/test_parity_evaluation.py   |  47 
 .../mlv2/tests/connect/test_parity_feature.py  |  40 
 .../mlv2/tests/connect/test_parity_summarizer.py   |  40 
 python/pyspark/mlv2/tests/test_evaluation.py   |  88 
 python/pyspark/mlv2/tests/test_feature.py  |  98 +
 python/pyspark/mlv2/tests/test_summarizer.py   |  80 +++
 python/pyspark/mlv2/util.py| 192 +
 16 files changed, 1203 insertions(+), 1 deletion(-)
 create mode 100644 python/pyspark/mlv2/__init__.py
 create mode 100644 python/pyspark/mlv2/base.py
 create mode 100644 python/pyspark/mlv2/evaluation.py
 create mode 100644 python/pyspark/mlv2/feature.py
 create mode 100644 python/pyspark/mlv2/summarizer.py
 create mode 100644 python/pyspark/mlv2/tests/connect/test_parity_evaluation.py
 create mode 100644 python/pyspark/mlv2/tests/connect/test_parity_feature.py
 create mode 100644 python/pyspark/mlv2/tests/connect/test_parity_summarizer.py
 create mode 100644 python/pyspark/mlv2/tests/test_evaluation.py
 create mode 100644 python/pyspark/mlv2/tests/test_feature.py
 create mode 100644 python/pyspark/mlv2/tests/test_summarizer.py
 create mode 100644 python/pyspark/mlv2/util.py


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (aed6a47580e -> abd864766b0)

2023-04-30 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from aed6a47580e [SPARK-43320][SQL][HIVE] Directly call Hive 2.3.9 API
 add abd864766b0 [SPARK-43081][ML][CONNECT] Add torch distributor data 
loader that loads data from spark partition data

No new revisions were added by this update.

Summary of changes:
 dev/sparktestsupport/modules.py|   2 +
 .../connect/test_parity_torch_data_loader.py}  |  26 ++-
 python/pyspark/ml/torch/data.py|  79 +++
 python/pyspark/ml/torch/distributor.py | 259 ++---
 python/pyspark/ml/torch/tests/test_data_loader.py  | 136 +++
 python/pyspark/ml/torch/tests/test_distributor.py  |   2 +-
 6 files changed, 461 insertions(+), 43 deletions(-)
 copy python/pyspark/{pandas/tests/connect/test_parity_dataframe_spark_io.py => 
ml/tests/connect/test_parity_torch_data_loader.py} (61%)
 create mode 100644 python/pyspark/ml/torch/data.py
 create mode 100644 python/pyspark/ml/torch/tests/test_data_loader.py


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-42929] make mapInPandas / mapInArrow support "is_barrier"

2023-03-27 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2a1ac07132b [SPARK-42929] make mapInPandas / mapInArrow support 
"is_barrier"
2a1ac07132b is described below

commit 2a1ac07132b7abc13e56b9a632b3dece7e4b60ea
Author: Weichen Xu 
AuthorDate: Mon Mar 27 17:50:23 2023 +0800

[SPARK-42929] make mapInPandas / mapInArrow support "is_barrier"

### What changes were proposed in this pull request?

make mapInPandas / mapInArrow support "is_barrier"

### Why are the changes needed?

feature parity.

### Does this PR introduce _any_ user-facing change?

Yes.

### How was this patch tested?

Manually:

`bin/pyspark --remote local`:

```
from pyspark.sql.functions import pandas_udf
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, df.schema,  is_barrier=True).collect()

def filter_func(iterator):
for batch in iterator:
pdf = batch.to_pandas()
yield pyarrow.RecordBatch.from_pandas(pdf[pdf.id == 1])

df.mapInArrow(filter_func, df.schema, is_barrier=True).collect()
```

Closes #40559 from WeichenXu123/spark-connect-barrier-mode.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
---
 .../main/protobuf/spark/connect/relations.proto|  3 +++
 .../sql/connect/planner/SparkConnectPlanner.scala  |  5 ++--
 python/pyspark/sql/connect/dataframe.py| 21 +++
 python/pyspark/sql/connect/plan.py |  8 +-
 python/pyspark/sql/connect/proto/relations_pb2.py  | 24 -
 python/pyspark/sql/connect/proto/relations_pb2.pyi | 31 --
 6 files changed, 70 insertions(+), 22 deletions(-)

diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
index 976bd68e7fe..c965a6c8d32 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
@@ -794,6 +794,9 @@ message MapPartitions {
 
   // (Required) Input user-defined function.
   CommonInlineUserDefinedFunction func = 2;
+
+  // (Optional) isBarrier.
+  optional bool is_barrier = 3;
 }
 
 message GroupMap {
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index e7911ccdf11..e7e88cab643 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -484,19 +484,20 @@ class SparkConnectPlanner(val session: SparkSession) {
   private def transformMapPartitions(rel: proto.MapPartitions): LogicalPlan = {
 val commonUdf = rel.getFunc
 val pythonUdf = transformPythonUDF(commonUdf)
+val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
 pythonUdf.evalType match {
   case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
 logical.MapInPandas(
   pythonUdf,
   pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
   transformRelation(rel.getInput),
-  false)
+  isBarrier)
   case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
 logical.PythonMapInArrow(
   pythonUdf,
   pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
   transformRelation(rel.getInput),
-  false)
+  isBarrier)
   case _ =>
 throw InvalidPlanInput(s"Function with EvalType: ${pythonUdf.evalType} 
is not supported")
 }
diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 2dfc8e72193..10426c3c28d 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -1623,6 +1623,7 @@ class DataFrame:
 func: "PandasMapIterFunction",
 schema: Union[StructType, str],
 evalType: int,
+is_barrier: bool,
 ) -> "DataFrame":
 from pyspark.sql.connect.udf import UserDefinedFunction
 
@@ -1636,21 +1637,31 @@ class DataFrame:
 )
 
 return DataFrame.withPlan(
-plan.MapPartitions(child=self._plan, function=udf_obj, 
cols=self.columns),
+plan.MapPartitions(
+child=self._plan, function=udf_obj, cols=self.columns, 
is_barrier=is_b

[spark] branch master updated: [SPARK-42896][SQL][PYTHON] Make `mapInPandas` / `mapInArrow` support barrier mode execution

2023-03-26 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 06bf544973f [SPARK-42896][SQL][PYTHON] Make `mapInPandas` / 
`mapInArrow` support barrier mode execution
06bf544973f is described below

commit 06bf544973f4e221c569487473fbe3268543ebb7
Author: Weichen Xu 
AuthorDate: Mon Mar 27 09:39:48 2023 +0800

[SPARK-42896][SQL][PYTHON] Make `mapInPandas` / `mapInArrow` support 
barrier mode execution

### What changes were proposed in this pull request?

Make mapInPandas / mapInArrow support barrier mode execution

### Why are the changes needed?

This is the preparation PR for supporting mapInPandas / mapInArrow barrier 
execution in spark connect mode. The feature is required by machine learning 
use cases.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Closes #40520 from WeichenXu123/barrier-udf.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
---
 .../sql/connect/planner/SparkConnectPlanner.scala  |  6 +++--
 python/pyspark/sql/pandas/map_ops.py   | 26 ++
 .../catalyst/analysis/DeduplicateRelations.scala   |  4 ++--
 .../plans/logical/pythonLogicalOperators.scala |  6 +++--
 .../sql/catalyst/analysis/AnalysisSuite.scala  |  3 ++-
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 10 +
 .../spark/sql/execution/SparkStrategies.scala  |  8 +++
 .../sql/execution/python/MapInBatchExec.scala  | 10 -
 .../sql/execution/python/MapInPandasExec.scala |  3 ++-
 .../execution/python/PythonMapInArrowExec.scala|  3 ++-
 10 files changed, 57 insertions(+), 22 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 13052ec9b01..e7911ccdf11 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -489,12 +489,14 @@ class SparkConnectPlanner(val session: SparkSession) {
 logical.MapInPandas(
   pythonUdf,
   pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-  transformRelation(rel.getInput))
+  transformRelation(rel.getInput),
+  false)
   case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
 logical.PythonMapInArrow(
   pythonUdf,
   pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-  transformRelation(rel.getInput))
+  transformRelation(rel.getInput),
+  false)
   case _ =>
 throw InvalidPlanInput(s"Function with EvalType: ${pythonUdf.evalType} 
is not supported")
 }
diff --git a/python/pyspark/sql/pandas/map_ops.py 
b/python/pyspark/sql/pandas/map_ops.py
index 47b17578ae1..a4c0c94844b 100644
--- a/python/pyspark/sql/pandas/map_ops.py
+++ b/python/pyspark/sql/pandas/map_ops.py
@@ -32,7 +32,7 @@ class PandasMapOpsMixin:
 """
 
 def mapInPandas(
-self, func: "PandasMapIterFunction", schema: Union[StructType, str]
+self, func: "PandasMapIterFunction", schema: Union[StructType, str], 
isBarrier: bool = False
 ) -> "DataFrame":
 """
 Maps an iterator of batches in the current :class:`DataFrame` using a 
Python native
@@ -60,6 +60,7 @@ class PandasMapOpsMixin:
 schema : :class:`pyspark.sql.types.DataType` or str
 the return type of the `func` in PySpark. The value can be either a
 :class:`pyspark.sql.types.DataType` object or a DDL-formatted type 
string.
+isBarrier : Use barrier mode execution if True.
 
 Examples
 
@@ -74,6 +75,14 @@ class PandasMapOpsMixin:
 +---+---+
 |  1| 21|
 +---+---+
+>>> # Set isBarrier=True to force the "mapInPandas" stage running in 
barrier mode,
+>>> # it ensures all python UDF workers in the stage will be launched 
concurrently.
+>>> df.mapInPandas(filter_func, df.schema, isBarrier=True).show()  # 
doctest: +SKIP
++---+---+
+| id|age|
++---+---+
+|  1| 21|
++---+---+
 
 Notes
 -
@@ -93,11 +102,11 @@ class PandasMapOpsMixin:
 func, returnType=schema, 
functionType=PythonEvalType.SQL_MAP_PANDAS_ITER_UDF
 )  # type: ignore[call-overload]
 udf_column = udf(*[self[col] for col in self.columns])
-jdf = self._jdf.mapInPandas(udf_column._jc.expr())
+jdf = 

[spark] branch master updated: [SPARK-42732][PYSPARK][CONNECT] Support spark connect session getActiveSession method

2023-03-14 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 753864fedee [SPARK-42732][PYSPARK][CONNECT] Support spark connect 
session getActiveSession method
753864fedee is described below

commit 753864fedee62f638354040063d95b2b3ba93d46
Author: Weichen Xu 
AuthorDate: Tue Mar 14 18:33:24 2023 +0800

[SPARK-42732][PYSPARK][CONNECT] Support spark connect session 
getActiveSession method

### What changes were proposed in this pull request?

Support spark connect session getActiveSession method.

Spark connect ML needs this API to get active session in some cases (e.g. 
fetching model attributes from server side).

### Why are the changes needed?

Manually.

### Does this PR introduce _any_ user-facing change?

Yes. Implemented 
`pyspark.sql.connect.session.SparkSession.getActiveSession` API.

### How was this patch tested?

N/A

Closes #40353 from WeichenXu123/spark-connect-get-active-session.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
---
 python/pyspark/sql/connect/session.py | 13 -
 1 file changed, 12 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/connect/session.py 
b/python/pyspark/sql/connect/session.py
index 5e7c8361d80..ffa139eba3e 100644
--- a/python/pyspark/sql/connect/session.py
+++ b/python/pyspark/sql/connect/session.py
@@ -74,6 +74,11 @@ if TYPE_CHECKING:
 from pyspark.sql.connect.udf import UDFRegistration
 
 
+# `_active_spark_session` stores the active spark connect session created by
+# `SparkSession.builder.getOrCreate`. It is used by ML code.
+_active_spark_session = None
+
+
 class SparkSession:
 class Builder:
 """Builder for :class:`SparkSession`."""
@@ -119,7 +124,11 @@ class SparkSession:
 raise NotImplementedError("enableHiveSupport not implemented for 
Spark Connect")
 
 def getOrCreate(self) -> "SparkSession":
-return SparkSession(connectionString=self._options["spark.remote"])
+global _active_spark_session
+if _active_spark_session is not None:
+return _active_spark_session
+_active_spark_session = 
SparkSession(connectionString=self._options["spark.remote"])
+return _active_spark_session
 
 _client: SparkConnectClient
 
@@ -434,7 +443,9 @@ class SparkSession:
 # specifically in Spark Connect the Spark Connect server is designed 
for
 # multi-tenancy - the remote client side cannot just stop the server 
and stop
 # other remote clients being used from other users.
+global _active_spark_session
 self.client.close()
+_active_spark_session = None
 
 if "SPARK_LOCAL_REMOTE" in os.environ:
 # When local mode is in use, follow the regular Spark session's


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (b414b895ffd -> d5b08f8d99b)

2023-01-16 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from b414b895ffd [SPARK-41994] Assign SQLSTATE's (1/2)
 add d5b08f8d99b [SPARK-40264][ML] add batch_infer_udf function to 
pyspark.ml.functions

No new revisions were added by this update.

Summary of changes:
 dev/sparktestsupport/modules.py|   2 +
 python/pyspark/ml/functions.py | 637 +
 python/pyspark/ml/model_cache.py   |  55 ++
 python/pyspark/ml/tests/test_functions.py  | 501 
 .../tests/test_model_cache.py} |  35 +-
 5 files changed, 1218 insertions(+), 12 deletions(-)
 create mode 100644 python/pyspark/ml/model_cache.py
 create mode 100644 python/pyspark/ml/tests/test_functions.py
 copy python/pyspark/{sql/tests/connect/test_parity_datasources.py => 
ml/tests/test_model_cache.py} (54%)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41949][CORE][PYTHON] Make stage scheduling support local-cluster mode

2023-01-10 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8871f6dfb82 [SPARK-41949][CORE][PYTHON] Make stage scheduling support 
local-cluster mode
8871f6dfb82 is described below

commit 8871f6dfb82acd7f44dfa19fc24c877abe3a2fe3
Author: Weichen Xu 
AuthorDate: Tue Jan 10 21:05:44 2023 +0800

[SPARK-41949][CORE][PYTHON] Make stage scheduling support local-cluster mode

Signed-off-by: Weichen Xu 

### What changes were proposed in this pull request?

Make stage scheduling support local-cluster mode.

### Why are the changes needed?

This is useful in testing, especially for test code of third-party python 
libraries that depends on pyspark, many tests are written with pytest, but 
pytest is hard to integrate with a standalone spark cluster.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests

Closes #39424 from WeichenXu123/stage-sched-local-cluster.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
---
 .../spark/resource/ResourceProfileManager.scala|  12 +-
 dev/sparktestsupport/modules.py|   1 +
 python/pyspark/tests/test_stage_sched.py   | 153 +
 3 files changed, 161 insertions(+), 5 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
index 3f48aaded5c..9f98d4d9c9c 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
@@ -54,7 +54,9 @@ private[spark] class ResourceProfileManager(sparkConf: 
SparkConf,
   private val master = sparkConf.getOption("spark.master")
   private val isYarn = master.isDefined && master.get.equals("yarn")
   private val isK8s = master.isDefined && master.get.startsWith("k8s://")
-  private val isStandalone = master.isDefined && 
master.get.startsWith("spark://")
+  private val isStandaloneOrLocalCluster = master.isDefined && (
+  master.get.startsWith("spark://") || 
master.get.startsWith("local-cluster")
+)
   private val notRunningUnitTests = !isTesting
   private val testExceptionThrown = 
sparkConf.get(RESOURCE_PROFILE_MANAGER_TESTING)
 
@@ -65,16 +67,16 @@ private[spark] class ResourceProfileManager(sparkConf: 
SparkConf,
*/
   private[spark] def isSupported(rp: ResourceProfile): Boolean = {
 if (rp.isInstanceOf[TaskResourceProfile] && !dynamicEnabled) {
-  if ((notRunningUnitTests || testExceptionThrown) && !isStandalone) {
+  if ((notRunningUnitTests || testExceptionThrown) && 
!isStandaloneOrLocalCluster) {
 throw new SparkException("TaskResourceProfiles are only supported for 
Standalone " +
   "cluster for now when dynamic allocation is disabled.")
   }
 } else {
   val isNotDefaultProfile = rp.id != 
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
   val notYarnOrK8sOrStandaloneAndNotDefaultProfile =
-isNotDefaultProfile && !(isYarn || isK8s || isStandalone)
+isNotDefaultProfile && !(isYarn || isK8s || isStandaloneOrLocalCluster)
   val YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile =
-isNotDefaultProfile && (isYarn || isK8s || isStandalone) && 
!dynamicEnabled
+isNotDefaultProfile && (isYarn || isK8s || isStandaloneOrLocalCluster) 
&& !dynamicEnabled
 
   // We want the exception to be thrown only when we are specifically 
testing for the
   // exception or in a real application. Otherwise in all other testing 
scenarios we want
@@ -86,7 +88,7 @@ private[spark] class ResourceProfileManager(sparkConf: 
SparkConf,
   "and Standalone with dynamic allocation enabled.")
   }
 
-  if (isStandalone && dynamicEnabled && rp.getExecutorCores.isEmpty &&
+  if (isStandaloneOrLocalCluster && dynamicEnabled && 
rp.getExecutorCores.isEmpty &&
 sparkConf.getOption(config.EXECUTOR_CORES.key).isEmpty) {
 logWarning("Neither executor cores is set for resource profile, nor 
spark.executor.cores " +
   "is explicitly set, you may get more executors allocated than 
expected. " +
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 655c539cfae..5df495096b7 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -430,6 +430,7 @@ pyspark_core = Module(
 "pyspark.tests.test_taskcontext&q

[spark] branch branch-3.2 updated: [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes

2022-11-19 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 3b9cca7aa32 [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to 
be spark.task.cpus by default for spark executor JVM processes
3b9cca7aa32 is described below

commit 3b9cca7aa32f659ea7413abeb373fe7ed069e6f7
Author: Weichen Xu 
AuthorDate: Sat Nov 19 17:23:20 2022 +0800

[SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be 
spark.task.cpus by default for spark executor JVM processes

Signed-off-by: Weichen Xu 

### What changes were proposed in this pull request?

Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark 
executor JVM processes.

### Why are the changes needed?

This is for limiting the thread number for OpenBLAS routine to the number 
of cores assigned to this executor because some spark ML algorithms calls 
OpenBlAS via netlib-java,
e.g.:
Spark ALS estimator training calls LAPACK API `dppsv` (internally it will 
call BLAS lib), if it calls OpenBLAS lib, by default OpenBLAS will try to use 
all CPU cores. But spark will launch multiple spark tasks on a spark worker, 
and each spark task might call `dppsv` API at the same time, and each call 
internally it will create multiple threads (threads number equals to CPU 
cores), this causes CPU oversubscription.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually.

Closes #38699 from WeichenXu123/SPARK-41188.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
(cherry picked from commit 82a41d8ca273e7a9268324c6958f8bb14d9e)
Signed-off-by: Weichen Xu 
---
 core/src/main/scala/org/apache/spark/SparkContext.scala| 10 ++
 .../main/scala/org/apache/spark/api/python/PythonRunner.scala  |  7 ---
 .../scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala | 10 ++
 3 files changed, 16 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f7d8c799029..f991d2ea09c 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -542,6 +542,16 @@ class SparkContext(config: SparkConf) extends Logging {
 executorEnvs ++= _conf.getExecutorEnv
 executorEnvs("SPARK_USER") = sparkUser
 
+if (_conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) {
+  // if OMP_NUM_THREADS is not explicitly set, override it with the value 
of "spark.task.cpus"
+  // SPARK-41188: limit the thread number for OpenBLAS routine to the 
number of cores assigned
+  // to this executor because some spark ML algorithms calls OpenBlAS via 
netlib-java
+  // SPARK-28843: limit the OpenMP thread pool to the number of cores 
assigned to this executor
+  // this avoids high memory consumption with pandas/numpy because of a 
large OpenMP thread pool
+  // see https://github.com/numpy/numpy/issues/10455
+  executorEnvs.put("OMP_NUM_THREADS", _conf.get("spark.task.cpus", "1"))
+}
+
 _shuffleDriverComponents = 
ShuffleDataIOUtils.loadShuffleDataIO(config).driver()
 _shuffleDriverComponents.initializeApplication().asScala.foreach { case 
(k, v) =>
   _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 3a3e7e04e7f..d854874c0e8 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -131,13 +131,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
 val execCoresProp = 
Option(context.getLocalProperty(EXECUTOR_CORES_LOCAL_PROPERTY))
 val memoryMb = 
Option(context.getLocalProperty(PYSPARK_MEMORY_LOCAL_PROPERTY)).map(_.toLong)
 val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
-// if OMP_NUM_THREADS is not explicitly set, override it with the number 
of cores
-if (conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) {
-  // SPARK-28843: limit the OpenMP thread pool to the number of cores 
assigned to this executor
-  // this avoids high memory consumption with pandas/numpy because of a 
large OpenMP thread pool
-  // see https://github.com/numpy/numpy/issues/10455
-  execCoresProp.foreach(envVars.put("OMP_NUM_THREADS", _))
-}
 envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor 
thread
 

[spark] branch branch-3.3 updated: [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes

2022-11-19 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new f431cdf0944 [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to 
be spark.task.cpus by default for spark executor JVM processes
f431cdf0944 is described below

commit f431cdf09442b86133d17fa6e56cb8b77ca4e486
Author: Weichen Xu 
AuthorDate: Sat Nov 19 17:23:20 2022 +0800

[SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be 
spark.task.cpus by default for spark executor JVM processes

Signed-off-by: Weichen Xu 

### What changes were proposed in this pull request?

Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark 
executor JVM processes.

### Why are the changes needed?

This is for limiting the thread number for OpenBLAS routine to the number 
of cores assigned to this executor because some spark ML algorithms calls 
OpenBlAS via netlib-java,
e.g.:
Spark ALS estimator training calls LAPACK API `dppsv` (internally it will 
call BLAS lib), if it calls OpenBLAS lib, by default OpenBLAS will try to use 
all CPU cores. But spark will launch multiple spark tasks on a spark worker, 
and each spark task might call `dppsv` API at the same time, and each call 
internally it will create multiple threads (threads number equals to CPU 
cores), this causes CPU oversubscription.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually.

Closes #38699 from WeichenXu123/SPARK-41188.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
(cherry picked from commit 82a41d8ca273e7a9268324c6958f8bb14d9e)
Signed-off-by: Weichen Xu 
---
 core/src/main/scala/org/apache/spark/SparkContext.scala| 10 ++
 .../main/scala/org/apache/spark/api/python/PythonRunner.scala  |  7 ---
 .../scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala | 10 ++
 3 files changed, 16 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 02c58d2a9b4..0d0d4fe83a4 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -546,6 +546,16 @@ class SparkContext(config: SparkConf) extends Logging {
 executorEnvs ++= _conf.getExecutorEnv
 executorEnvs("SPARK_USER") = sparkUser
 
+if (_conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) {
+  // if OMP_NUM_THREADS is not explicitly set, override it with the value 
of "spark.task.cpus"
+  // SPARK-41188: limit the thread number for OpenBLAS routine to the 
number of cores assigned
+  // to this executor because some spark ML algorithms calls OpenBlAS via 
netlib-java
+  // SPARK-28843: limit the OpenMP thread pool to the number of cores 
assigned to this executor
+  // this avoids high memory consumption with pandas/numpy because of a 
large OpenMP thread pool
+  // see https://github.com/numpy/numpy/issues/10455
+  executorEnvs.put("OMP_NUM_THREADS", _conf.get("spark.task.cpus", "1"))
+}
+
 _shuffleDriverComponents = 
ShuffleDataIOUtils.loadShuffleDataIO(config).driver()
 _shuffleDriverComponents.initializeApplication().asScala.foreach { case 
(k, v) =>
   _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index f32c80f3ef5..bf5b862438a 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -133,13 +133,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
 val execCoresProp = 
Option(context.getLocalProperty(EXECUTOR_CORES_LOCAL_PROPERTY))
 val memoryMb = 
Option(context.getLocalProperty(PYSPARK_MEMORY_LOCAL_PROPERTY)).map(_.toLong)
 val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
-// if OMP_NUM_THREADS is not explicitly set, override it with the number 
of cores
-if (conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) {
-  // SPARK-28843: limit the OpenMP thread pool to the number of cores 
assigned to this executor
-  // this avoids high memory consumption with pandas/numpy because of a 
large OpenMP thread pool
-  // see https://github.com/numpy/numpy/issues/10455
-  execCoresProp.foreach(envVars.put("OMP_NUM_THREADS", _))
-}
 envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor 
thread
 

[spark] branch master updated: [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes

2022-11-19 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 82a41d8ca27 [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to 
be spark.task.cpus by default for spark executor JVM processes
82a41d8ca27 is described below

commit 82a41d8ca273e7a9268324c6958f8bb14d9e
Author: Weichen Xu 
AuthorDate: Sat Nov 19 17:23:20 2022 +0800

[SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be 
spark.task.cpus by default for spark executor JVM processes

Signed-off-by: Weichen Xu 

### What changes were proposed in this pull request?

Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark 
executor JVM processes.

### Why are the changes needed?

This is for limiting the thread number for OpenBLAS routine to the number 
of cores assigned to this executor because some spark ML algorithms calls 
OpenBlAS via netlib-java,
e.g.:
Spark ALS estimator training calls LAPACK API `dppsv` (internally it will 
call BLAS lib), if it calls OpenBLAS lib, by default OpenBLAS will try to use 
all CPU cores. But spark will launch multiple spark tasks on a spark worker, 
and each spark task might call `dppsv` API at the same time, and each call 
internally it will create multiple threads (threads number equals to CPU 
cores), this causes CPU oversubscription.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually.

Closes #38699 from WeichenXu123/SPARK-41188.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
---
 core/src/main/scala/org/apache/spark/SparkContext.scala| 10 ++
 .../main/scala/org/apache/spark/api/python/PythonRunner.scala  |  7 ---
 .../scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala | 10 ++
 3 files changed, 16 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d2c7067e596..5cbf2e83371 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -541,6 +541,16 @@ class SparkContext(config: SparkConf) extends Logging {
 executorEnvs ++= _conf.getExecutorEnv
 executorEnvs("SPARK_USER") = sparkUser
 
+if (_conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) {
+  // if OMP_NUM_THREADS is not explicitly set, override it with the value 
of "spark.task.cpus"
+  // SPARK-41188: limit the thread number for OpenBLAS routine to the 
number of cores assigned
+  // to this executor because some spark ML algorithms calls OpenBlAS via 
netlib-java
+  // SPARK-28843: limit the OpenMP thread pool to the number of cores 
assigned to this executor
+  // this avoids high memory consumption with pandas/numpy because of a 
large OpenMP thread pool
+  // see https://github.com/numpy/numpy/issues/10455
+  executorEnvs.put("OMP_NUM_THREADS", _conf.get("spark.task.cpus", "1"))
+}
+
 _shuffleDriverComponents = 
ShuffleDataIOUtils.loadShuffleDataIO(config).driver()
 _shuffleDriverComponents.initializeApplication().asScala.foreach { case 
(k, v) =>
   _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 14d5df14ed8..cdb2c620656 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -135,13 +135,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
 val execCoresProp = 
Option(context.getLocalProperty(EXECUTOR_CORES_LOCAL_PROPERTY))
 val memoryMb = 
Option(context.getLocalProperty(PYSPARK_MEMORY_LOCAL_PROPERTY)).map(_.toLong)
 val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
-// if OMP_NUM_THREADS is not explicitly set, override it with the number 
of cores
-if (conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) {
-  // SPARK-28843: limit the OpenMP thread pool to the number of cores 
assigned to this executor
-  // this avoids high memory consumption with pandas/numpy because of a 
large OpenMP thread pool
-  // see https://github.com/numpy/numpy/issues/10455
-  execCoresProp.foreach(envVars.put("OMP_NUM_THREADS", _))
-}
 envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor 
thread
 if (reuseWorker) {
   envVars.put("SPARK_REUSE_WORKER", "1")
diff --git 
a/resource

[spark] branch branch-3.1 updated: [SPARK-35542][ML] Fix: Bucketizer created for multiple columns with parameters splitsArray, inputCols and outputCols can not be loaded after saving it

2022-08-18 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new a1a2534a01f [SPARK-35542][ML] Fix: Bucketizer created for multiple 
columns with parameters splitsArray,  inputCols and outputCols can not be 
loaded after saving it
a1a2534a01f is described below

commit a1a2534a01fc776ad043129f3d33795c0f365347
Author: Weichen Xu 
AuthorDate: Fri Aug 19 12:26:34 2022 +0800

[SPARK-35542][ML] Fix: Bucketizer created for multiple columns with 
parameters splitsArray,  inputCols and outputCols can not be loaded after 
saving it

Signed-off-by: Weichen Xu 

### What changes were proposed in this pull request?
Fix: Bucketizer created for multiple columns with parameters splitsArray,  
inputCols and outputCols can not be loaded after saving it

### Why are the changes needed?
Bugfix.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test

Closes #37568 from WeichenXu123/SPARK-35542.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
(cherry picked from commit 876ce6a5df118095de51c3c4789d6db6da95eb23)
Signed-off-by: Weichen Xu 
---
 python/pyspark/ml/tests/test_persistence.py | 17 -
 python/pyspark/ml/wrapper.py|  6 +-
 2 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/ml/tests/test_persistence.py 
b/python/pyspark/ml/tests/test_persistence.py
index 77a6c030962..8680cb55b2f 100644
--- a/python/pyspark/ml/tests/test_persistence.py
+++ b/python/pyspark/ml/tests/test_persistence.py
@@ -25,7 +25,7 @@ from pyspark.ml.classification import DecisionTreeClassifier, 
FMClassifier, \
 FMClassificationModel, LogisticRegression, MultilayerPerceptronClassifier, 
\
 MultilayerPerceptronClassificationModel, OneVsRest, OneVsRestModel
 from pyspark.ml.clustering import KMeans
-from pyspark.ml.feature import Binarizer, HashingTF, PCA
+from pyspark.ml.feature import Binarizer, Bucketizer, HashingTF, PCA
 from pyspark.ml.linalg import Vectors
 from pyspark.ml.param import Params
 from pyspark.ml.pipeline import Pipeline, PipelineModel
@@ -461,6 +461,21 @@ class PersistenceTest(SparkSessionTestCase):
 loadedMetadata = reader._parseMetaData(metadataStr, )
 reader.getAndSetParams(lr, loadedMetadata)
 
+# Test for SPARK-35542 fix.
+def test_save_and_load_on_nested_list_params(self):
+temp_path = tempfile.mkdtemp()
+splitsArray = [
+[-float("inf"), 0.5, 1.4, float("inf")],
+[-float("inf"), 0.1, 1.2, float("inf")],
+]
+bucketizer = Bucketizer(
+splitsArray=splitsArray, inputCols=["values", "values"], 
outputCols=["b1", "b2"]
+)
+savePath = temp_path + "/bk"
+bucketizer.write().overwrite().save(savePath)
+loadedBucketizer = Bucketizer.load(savePath)
+assert loadedBucketizer.getSplitsArray() == splitsArray
+
 
 if __name__ == "__main__":
 from pyspark.ml.tests.test_persistence import *  # noqa: F401
diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py
index 3c8dc03a829..d21d5b92add 100644
--- a/python/pyspark/ml/wrapper.py
+++ b/python/pyspark/ml/wrapper.py
@@ -185,7 +185,11 @@ class JavaParams(JavaWrapper, Params, metaclass=ABCMeta):
 java_param = self._java_obj.getParam(param.name)
 # SPARK-14931: Only check set params back to avoid default 
params mismatch.
 if self._java_obj.isSet(java_param):
-value = _java2py(sc, 
self._java_obj.getOrDefault(java_param))
+java_value = self._java_obj.getOrDefault(java_param)
+if param.typeConverter.__name__.startswith("toList"):
+value = [_java2py(sc, x) for x in list(java_value)]
+else:
+value = _java2py(sc, java_value)
 self._set(**{param.name: value})
 # SPARK-10931: Temporary fix for params that have a default in 
Java
 if self._java_obj.hasDefault(java_param) and not 
self.isDefined(param):


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-35542][ML] Fix: Bucketizer created for multiple columns with parameters splitsArray, inputCols and outputCols can not be loaded after saving it

2022-08-18 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 3943427b847 [SPARK-35542][ML] Fix: Bucketizer created for multiple 
columns with parameters splitsArray,  inputCols and outputCols can not be 
loaded after saving it
3943427b847 is described below

commit 3943427b8478d613c25f643553f8731318b0a445
Author: Weichen Xu 
AuthorDate: Fri Aug 19 12:26:34 2022 +0800

[SPARK-35542][ML] Fix: Bucketizer created for multiple columns with 
parameters splitsArray,  inputCols and outputCols can not be loaded after 
saving it

Signed-off-by: Weichen Xu 

### What changes were proposed in this pull request?
Fix: Bucketizer created for multiple columns with parameters splitsArray,  
inputCols and outputCols can not be loaded after saving it

### Why are the changes needed?
Bugfix.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test

Closes #37568 from WeichenXu123/SPARK-35542.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
(cherry picked from commit 876ce6a5df118095de51c3c4789d6db6da95eb23)
Signed-off-by: Weichen Xu 
---
 python/pyspark/ml/tests/test_persistence.py | 17 -
 python/pyspark/ml/wrapper.py|  6 +-
 2 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/ml/tests/test_persistence.py 
b/python/pyspark/ml/tests/test_persistence.py
index 77a6c030962..8680cb55b2f 100644
--- a/python/pyspark/ml/tests/test_persistence.py
+++ b/python/pyspark/ml/tests/test_persistence.py
@@ -25,7 +25,7 @@ from pyspark.ml.classification import DecisionTreeClassifier, 
FMClassifier, \
 FMClassificationModel, LogisticRegression, MultilayerPerceptronClassifier, 
\
 MultilayerPerceptronClassificationModel, OneVsRest, OneVsRestModel
 from pyspark.ml.clustering import KMeans
-from pyspark.ml.feature import Binarizer, HashingTF, PCA
+from pyspark.ml.feature import Binarizer, Bucketizer, HashingTF, PCA
 from pyspark.ml.linalg import Vectors
 from pyspark.ml.param import Params
 from pyspark.ml.pipeline import Pipeline, PipelineModel
@@ -461,6 +461,21 @@ class PersistenceTest(SparkSessionTestCase):
 loadedMetadata = reader._parseMetaData(metadataStr, )
 reader.getAndSetParams(lr, loadedMetadata)
 
+# Test for SPARK-35542 fix.
+def test_save_and_load_on_nested_list_params(self):
+temp_path = tempfile.mkdtemp()
+splitsArray = [
+[-float("inf"), 0.5, 1.4, float("inf")],
+[-float("inf"), 0.1, 1.2, float("inf")],
+]
+bucketizer = Bucketizer(
+splitsArray=splitsArray, inputCols=["values", "values"], 
outputCols=["b1", "b2"]
+)
+savePath = temp_path + "/bk"
+bucketizer.write().overwrite().save(savePath)
+loadedBucketizer = Bucketizer.load(savePath)
+assert loadedBucketizer.getSplitsArray() == splitsArray
+
 
 if __name__ == "__main__":
 from pyspark.ml.tests.test_persistence import *  # noqa: F401
diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py
index 3c8dc03a829..d21d5b92add 100644
--- a/python/pyspark/ml/wrapper.py
+++ b/python/pyspark/ml/wrapper.py
@@ -185,7 +185,11 @@ class JavaParams(JavaWrapper, Params, metaclass=ABCMeta):
 java_param = self._java_obj.getParam(param.name)
 # SPARK-14931: Only check set params back to avoid default 
params mismatch.
 if self._java_obj.isSet(java_param):
-value = _java2py(sc, 
self._java_obj.getOrDefault(java_param))
+java_value = self._java_obj.getOrDefault(java_param)
+if param.typeConverter.__name__.startswith("toList"):
+value = [_java2py(sc, x) for x in list(java_value)]
+else:
+value = _java2py(sc, java_value)
 self._set(**{param.name: value})
 # SPARK-10931: Temporary fix for params that have a default in 
Java
 if self._java_obj.hasDefault(java_param) and not 
self.isDefined(param):


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-35542][ML] Fix: Bucketizer created for multiple columns with parameters splitsArray, inputCols and outputCols can not be loaded after saving it

2022-08-18 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 87f957dea86 [SPARK-35542][ML] Fix: Bucketizer created for multiple 
columns with parameters splitsArray,  inputCols and outputCols can not be 
loaded after saving it
87f957dea86 is described below

commit 87f957dea86fe1b8c5979e499b5400866b235e43
Author: Weichen Xu 
AuthorDate: Fri Aug 19 12:26:34 2022 +0800

[SPARK-35542][ML] Fix: Bucketizer created for multiple columns with 
parameters splitsArray,  inputCols and outputCols can not be loaded after 
saving it

Signed-off-by: Weichen Xu 

### What changes were proposed in this pull request?
Fix: Bucketizer created for multiple columns with parameters splitsArray,  
inputCols and outputCols can not be loaded after saving it

### Why are the changes needed?
Bugfix.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test

Closes #37568 from WeichenXu123/SPARK-35542.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
(cherry picked from commit 876ce6a5df118095de51c3c4789d6db6da95eb23)
Signed-off-by: Weichen Xu 
---
 python/pyspark/ml/tests/test_persistence.py | 17 -
 python/pyspark/ml/wrapper.py|  6 +-
 2 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/ml/tests/test_persistence.py 
b/python/pyspark/ml/tests/test_persistence.py
index 4f09a49dd04..0b54540f06d 100644
--- a/python/pyspark/ml/tests/test_persistence.py
+++ b/python/pyspark/ml/tests/test_persistence.py
@@ -32,7 +32,7 @@ from pyspark.ml.classification import (
 OneVsRestModel,
 )
 from pyspark.ml.clustering import KMeans
-from pyspark.ml.feature import Binarizer, HashingTF, PCA
+from pyspark.ml.feature import Binarizer, Bucketizer, HashingTF, PCA
 from pyspark.ml.linalg import Vectors
 from pyspark.ml.param import Params
 from pyspark.ml.pipeline import Pipeline, PipelineModel
@@ -518,6 +518,21 @@ class PersistenceTest(SparkSessionTestCase):
 )
 reader.getAndSetParams(lr, loadedMetadata)
 
+# Test for SPARK-35542 fix.
+def test_save_and_load_on_nested_list_params(self):
+temp_path = tempfile.mkdtemp()
+splitsArray = [
+[-float("inf"), 0.5, 1.4, float("inf")],
+[-float("inf"), 0.1, 1.2, float("inf")],
+]
+bucketizer = Bucketizer(
+splitsArray=splitsArray, inputCols=["values", "values"], 
outputCols=["b1", "b2"]
+)
+savePath = temp_path + "/bk"
+bucketizer.write().overwrite().save(savePath)
+loadedBucketizer = Bucketizer.load(savePath)
+assert loadedBucketizer.getSplitsArray() == splitsArray
+
 
 if __name__ == "__main__":
 from pyspark.ml.tests.test_persistence import *  # noqa: F401
diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py
index 7853e766244..32856540d6d 100644
--- a/python/pyspark/ml/wrapper.py
+++ b/python/pyspark/ml/wrapper.py
@@ -220,7 +220,11 @@ class JavaParams(JavaWrapper, Params, metaclass=ABCMeta):
 java_param = self._java_obj.getParam(param.name)
 # SPARK-14931: Only check set params back to avoid default 
params mismatch.
 if self._java_obj.isSet(java_param):
-value = _java2py(sc, 
self._java_obj.getOrDefault(java_param))
+java_value = self._java_obj.getOrDefault(java_param)
+if param.typeConverter.__name__.startswith("toList"):
+value = [_java2py(sc, x) for x in list(java_value)]
+else:
+value = _java2py(sc, java_value)
 self._set(**{param.name: value})
 # SPARK-10931: Temporary fix for params that have a default in 
Java
 if self._java_obj.hasDefault(java_param) and not 
self.isDefined(param):


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-35542][ML] Fix: Bucketizer created for multiple columns with parameters splitsArray, inputCols and outputCols can not be loaded after saving it

2022-08-18 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 876ce6a5df1 [SPARK-35542][ML] Fix: Bucketizer created for multiple 
columns with parameters splitsArray,  inputCols and outputCols can not be 
loaded after saving it
876ce6a5df1 is described below

commit 876ce6a5df118095de51c3c4789d6db6da95eb23
Author: Weichen Xu 
AuthorDate: Fri Aug 19 12:26:34 2022 +0800

[SPARK-35542][ML] Fix: Bucketizer created for multiple columns with 
parameters splitsArray,  inputCols and outputCols can not be loaded after 
saving it

Signed-off-by: Weichen Xu 

### What changes were proposed in this pull request?
Fix: Bucketizer created for multiple columns with parameters splitsArray,  
inputCols and outputCols can not be loaded after saving it

### Why are the changes needed?
Bugfix.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test

Closes #37568 from WeichenXu123/SPARK-35542.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
---
 python/pyspark/ml/tests/test_persistence.py | 17 -
 python/pyspark/ml/wrapper.py|  6 +-
 2 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/ml/tests/test_persistence.py 
b/python/pyspark/ml/tests/test_persistence.py
index 4f09a49dd04..0b54540f06d 100644
--- a/python/pyspark/ml/tests/test_persistence.py
+++ b/python/pyspark/ml/tests/test_persistence.py
@@ -32,7 +32,7 @@ from pyspark.ml.classification import (
 OneVsRestModel,
 )
 from pyspark.ml.clustering import KMeans
-from pyspark.ml.feature import Binarizer, HashingTF, PCA
+from pyspark.ml.feature import Binarizer, Bucketizer, HashingTF, PCA
 from pyspark.ml.linalg import Vectors
 from pyspark.ml.param import Params
 from pyspark.ml.pipeline import Pipeline, PipelineModel
@@ -518,6 +518,21 @@ class PersistenceTest(SparkSessionTestCase):
 )
 reader.getAndSetParams(lr, loadedMetadata)
 
+# Test for SPARK-35542 fix.
+def test_save_and_load_on_nested_list_params(self):
+temp_path = tempfile.mkdtemp()
+splitsArray = [
+[-float("inf"), 0.5, 1.4, float("inf")],
+[-float("inf"), 0.1, 1.2, float("inf")],
+]
+bucketizer = Bucketizer(
+splitsArray=splitsArray, inputCols=["values", "values"], 
outputCols=["b1", "b2"]
+)
+savePath = temp_path + "/bk"
+bucketizer.write().overwrite().save(savePath)
+loadedBucketizer = Bucketizer.load(savePath)
+assert loadedBucketizer.getSplitsArray() == splitsArray
+
 
 if __name__ == "__main__":
 from pyspark.ml.tests.test_persistence import *  # noqa: F401
diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py
index 39685ea631e..a83ed4c3d4b 100644
--- a/python/pyspark/ml/wrapper.py
+++ b/python/pyspark/ml/wrapper.py
@@ -220,7 +220,11 @@ class JavaParams(JavaWrapper, Params, metaclass=ABCMeta):
 java_param = self._java_obj.getParam(param.name)
 # SPARK-14931: Only check set params back to avoid default 
params mismatch.
 if self._java_obj.isSet(java_param):
-value = _java2py(sc, 
self._java_obj.getOrDefault(java_param))
+java_value = self._java_obj.getOrDefault(java_param)
+if param.typeConverter.__name__.startswith("toList"):
+value = [_java2py(sc, x) for x in list(java_value)]
+else:
+value = _java2py(sc, java_value)
 self._set(**{param.name: value})
 # SPARK-10931: Temporary fix for params that have a default in 
Java
 if self._java_obj.hasDefault(java_param) and not 
self.isDefined(param):


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.1 updated: [SPARK-40079] Add Imputer inputCols validation for empty input case

2022-08-15 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new f2453e8a129 [SPARK-40079] Add Imputer inputCols validation for empty 
input case
f2453e8a129 is described below

commit f2453e8a1293b367c2d7794e7f37978cc848aebc
Author: Weichen Xu 
AuthorDate: Mon Aug 15 18:03:08 2022 +0800

[SPARK-40079] Add Imputer inputCols validation for empty input case

Signed-off-by: Weichen Xu 

### What changes were proposed in this pull request?
Add Imputer inputCols validation for empty input case

### Why are the changes needed?
If Imputer inputCols is empty, the `fit` works fine but when saving model, 
error will be raised:

>
AnalysisException:
Datasource does not support writing empty or nested empty schemas.
Please make sure the data schema has at least one or more column(s).

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Unit test.

Closes #37518 from WeichenXu123/imputer-param-validation.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
(cherry picked from commit 87094f89655b7df09cdecb47c653461ae855b0ac)
Signed-off-by: Weichen Xu 
---
 mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala |  1 +
 .../test/scala/org/apache/spark/ml/feature/ImputerSuite.scala  | 10 ++
 2 files changed, 11 insertions(+)

diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala
index 71403acc91b..5998887923f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala
@@ -81,6 +81,7 @@ private[feature] trait ImputerParams extends Params with 
HasInputCol with HasInp
   protected def validateAndTransformSchema(schema: StructType): StructType = {
 ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), 
Seq(outputCols))
 val (inputColNames, outputColNames) = getInOutCols()
+require(inputColNames.length > 0, "inputCols cannot be empty")
 require(inputColNames.length == inputColNames.distinct.length, s"inputCols 
contains" +
   s" duplicates: (${inputColNames.mkString(", ")})")
 require(outputColNames.length == outputColNames.distinct.length, 
s"outputCols contains" +
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala
index 30887f55638..5ef22a282c3 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala
@@ -268,6 +268,16 @@ class ImputerSuite extends MLTest with 
DefaultReadWriteTest {
 }
 assert(e.getMessage.contains("outputCols contains duplicates"))
   }
+
+  withClue("Imputer should fail if inputCols param is empty.") {
+val e: IllegalArgumentException = intercept[IllegalArgumentException] {
+  val imputer = new Imputer().setStrategy(strategy)
+.setInputCols(Array[String]())
+.setOutputCols(Array[String]())
+  val model = imputer.fit(df)
+}
+assert(e.getMessage.contains("inputCols cannot be empty"))
+  }
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-40079] Add Imputer inputCols validation for empty input case

2022-08-15 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 2b54b48cd85 [SPARK-40079] Add Imputer inputCols validation for empty 
input case
2b54b48cd85 is described below

commit 2b54b48cd852f93e8cf24397df6f3ec5b755233e
Author: Weichen Xu 
AuthorDate: Mon Aug 15 18:03:08 2022 +0800

[SPARK-40079] Add Imputer inputCols validation for empty input case

Signed-off-by: Weichen Xu 

### What changes were proposed in this pull request?
Add Imputer inputCols validation for empty input case

### Why are the changes needed?
If Imputer inputCols is empty, the `fit` works fine but when saving model, 
error will be raised:

>
AnalysisException:
Datasource does not support writing empty or nested empty schemas.
Please make sure the data schema has at least one or more column(s).

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Unit test.

Closes #37518 from WeichenXu123/imputer-param-validation.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
(cherry picked from commit 87094f89655b7df09cdecb47c653461ae855b0ac)
Signed-off-by: Weichen Xu 
---
 mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala |  1 +
 .../test/scala/org/apache/spark/ml/feature/ImputerSuite.scala  | 10 ++
 2 files changed, 11 insertions(+)

diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala
index 71403acc91b..5998887923f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala
@@ -81,6 +81,7 @@ private[feature] trait ImputerParams extends Params with 
HasInputCol with HasInp
   protected def validateAndTransformSchema(schema: StructType): StructType = {
 ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), 
Seq(outputCols))
 val (inputColNames, outputColNames) = getInOutCols()
+require(inputColNames.length > 0, "inputCols cannot be empty")
 require(inputColNames.length == inputColNames.distinct.length, s"inputCols 
contains" +
   s" duplicates: (${inputColNames.mkString(", ")})")
 require(outputColNames.length == outputColNames.distinct.length, 
s"outputCols contains" +
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala
index 30887f55638..5ef22a282c3 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala
@@ -268,6 +268,16 @@ class ImputerSuite extends MLTest with 
DefaultReadWriteTest {
 }
 assert(e.getMessage.contains("outputCols contains duplicates"))
   }
+
+  withClue("Imputer should fail if inputCols param is empty.") {
+val e: IllegalArgumentException = intercept[IllegalArgumentException] {
+  val imputer = new Imputer().setStrategy(strategy)
+.setInputCols(Array[String]())
+.setOutputCols(Array[String]())
+  val model = imputer.fit(df)
+}
+assert(e.getMessage.contains("inputCols cannot be empty"))
+  }
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-40079] Add Imputer inputCols validation for empty input case

2022-08-15 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 2ee196dbb0b [SPARK-40079] Add Imputer inputCols validation for empty 
input case
2ee196dbb0b is described below

commit 2ee196dbb0bf9ecfd96a1928cbaf15b7c3856d3d
Author: Weichen Xu 
AuthorDate: Mon Aug 15 18:03:08 2022 +0800

[SPARK-40079] Add Imputer inputCols validation for empty input case

Signed-off-by: Weichen Xu 

### What changes were proposed in this pull request?
Add Imputer inputCols validation for empty input case

### Why are the changes needed?
If Imputer inputCols is empty, the `fit` works fine but when saving model, 
error will be raised:

>
AnalysisException:
Datasource does not support writing empty or nested empty schemas.
Please make sure the data schema has at least one or more column(s).

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Unit test.

Closes #37518 from WeichenXu123/imputer-param-validation.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
(cherry picked from commit 87094f89655b7df09cdecb47c653461ae855b0ac)
Signed-off-by: Weichen Xu 
---
 mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala |  1 +
 .../test/scala/org/apache/spark/ml/feature/ImputerSuite.scala  | 10 ++
 2 files changed, 11 insertions(+)

diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala
index 71403acc91b..5998887923f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala
@@ -81,6 +81,7 @@ private[feature] trait ImputerParams extends Params with 
HasInputCol with HasInp
   protected def validateAndTransformSchema(schema: StructType): StructType = {
 ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), 
Seq(outputCols))
 val (inputColNames, outputColNames) = getInOutCols()
+require(inputColNames.length > 0, "inputCols cannot be empty")
 require(inputColNames.length == inputColNames.distinct.length, s"inputCols 
contains" +
   s" duplicates: (${inputColNames.mkString(", ")})")
 require(outputColNames.length == outputColNames.distinct.length, 
s"outputCols contains" +
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala
index 30887f55638..5ef22a282c3 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala
@@ -268,6 +268,16 @@ class ImputerSuite extends MLTest with 
DefaultReadWriteTest {
 }
 assert(e.getMessage.contains("outputCols contains duplicates"))
   }
+
+  withClue("Imputer should fail if inputCols param is empty.") {
+val e: IllegalArgumentException = intercept[IllegalArgumentException] {
+  val imputer = new Imputer().setStrategy(strategy)
+.setInputCols(Array[String]())
+.setOutputCols(Array[String]())
+  val model = imputer.fit(df)
+}
+assert(e.getMessage.contains("inputCols cannot be empty"))
+  }
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-40079] Add Imputer inputCols validation for empty input case

2022-08-15 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 87094f89655 [SPARK-40079] Add Imputer inputCols validation for empty 
input case
87094f89655 is described below

commit 87094f89655b7df09cdecb47c653461ae855b0ac
Author: Weichen Xu 
AuthorDate: Mon Aug 15 18:03:08 2022 +0800

[SPARK-40079] Add Imputer inputCols validation for empty input case

Signed-off-by: Weichen Xu 

### What changes were proposed in this pull request?
Add Imputer inputCols validation for empty input case

### Why are the changes needed?
If Imputer inputCols is empty, the `fit` works fine but when saving model, 
error will be raised:

>
AnalysisException:
Datasource does not support writing empty or nested empty schemas.
Please make sure the data schema has at least one or more column(s).

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Unit test.

Closes #37518 from WeichenXu123/imputer-param-validation.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
---
 mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala |  1 +
 .../test/scala/org/apache/spark/ml/feature/ImputerSuite.scala  | 10 ++
 2 files changed, 11 insertions(+)

diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala
index 71403acc91b..5998887923f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala
@@ -81,6 +81,7 @@ private[feature] trait ImputerParams extends Params with 
HasInputCol with HasInp
   protected def validateAndTransformSchema(schema: StructType): StructType = {
 ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), 
Seq(outputCols))
 val (inputColNames, outputColNames) = getInOutCols()
+require(inputColNames.length > 0, "inputCols cannot be empty")
 require(inputColNames.length == inputColNames.distinct.length, s"inputCols 
contains" +
   s" duplicates: (${inputColNames.mkString(", ")})")
 require(outputColNames.length == outputColNames.distinct.length, 
s"outputCols contains" +
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala
index 30887f55638..5ef22a282c3 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala
@@ -268,6 +268,16 @@ class ImputerSuite extends MLTest with 
DefaultReadWriteTest {
 }
 assert(e.getMessage.contains("outputCols contains duplicates"))
   }
+
+  withClue("Imputer should fail if inputCols param is empty.") {
+val e: IllegalArgumentException = intercept[IllegalArgumentException] {
+  val imputer = new Imputer().setStrategy(strategy)
+.setInputCols(Array[String]())
+.setOutputCols(Array[String]())
+  val model = imputer.fit(df)
+}
+assert(e.getMessage.contains("inputCols cannot be empty"))
+  }
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-39071][SQL][PYTHON] Add unwrap_udt function for unwrapping UserDefinedType columns

2022-05-10 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new daedcd29630 [SPARK-39071][SQL][PYTHON] Add unwrap_udt function for 
unwrapping UserDefinedType columns
daedcd29630 is described below

commit daedcd29630de42f6d0858e1e693f4a3d9caf1aa
Author: Weichen Xu 
AuthorDate: Tue May 10 19:29:05 2022 +0800

[SPARK-39071][SQL][PYTHON] Add unwrap_udt function for unwrapping 
UserDefinedType columns

### What changes were proposed in this pull request?
Add unwrap_udt function for unwrapping UserDefinedType columns

### Why are the changes needed?
This is useful in open-source project 
https://github.com/mengxr/pyspark-xgboost

### Does this PR introduce _any_ user-facing change?
Yes.
new sql function `unwrap_udt` added.

### How was this patch tested?
Unit test.

Closes #36408 from WeichenXu123/unwrapt_udt.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
---
 python/pyspark/ml/tests/test_linalg.py | 14 +++
 python/pyspark/sql/functions.py| 10 +
 .../spark/sql/catalyst/expressions/UnwrapUDT.scala | 49 ++
 .../scala/org/apache/spark/sql/functions.scala |  9 
 .../apache/spark/sql/UserDefinedTypeSuite.scala| 10 +
 5 files changed, 92 insertions(+)

diff --git a/python/pyspark/ml/tests/test_linalg.py 
b/python/pyspark/ml/tests/test_linalg.py
index dfdd32e98eb..a6e9f4e752e 100644
--- a/python/pyspark/ml/tests/test_linalg.py
+++ b/python/pyspark/ml/tests/test_linalg.py
@@ -33,6 +33,7 @@ from pyspark.ml.linalg import (
 )
 from pyspark.testing.mllibutils import MLlibTestCase
 from pyspark.sql import Row
+from pyspark.sql.functions import unwrap_udt
 
 
 class VectorTests(MLlibTestCase):
@@ -351,6 +352,19 @@ class VectorUDTTests(MLlibTestCase):
 else:
 raise TypeError("expecting a vector but got %r of type %r" % 
(v, type(v)))
 
+def test_unwrap_udt(self):
+df = self.spark.createDataFrame(
+[(Vectors.dense(1.0, 2.0, 3.0),), (Vectors.sparse(3, {1: 1.0, 2: 
5.5}),)],
+["vec"],
+)
+results = df.select(unwrap_udt("vec").alias("v2")).collect()
+unwrapped_vec = Row("type", "size", "indices", "values")
+expected = [
+Row(v2=unwrapped_vec(1, None, None, [1.0, 2.0, 3.0])),
+Row(v2=unwrapped_vec(0, 3, [1, 2], [1.0, 5.5])),
+]
+self.assertEquals(results, expected)
+
 
 class MatrixUDTTests(MLlibTestCase):
 
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 019f64b5171..aa9aa5ed51b 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -5318,6 +5318,16 @@ def bucket(numBuckets: Union[Column, int], col: 
"ColumnOrName") -> Column:
 return _invoke_function("bucket", numBuckets, _to_java_column(col))
 
 
+def unwrap_udt(col: "ColumnOrName") -> Column:
+"""
+Unwrap UDT data type column into its underlying type.
+
+.. versionadded:: 3.4.0
+
+"""
+return _invoke_function("unwrap_udt", _to_java_column(col))
+
+
 #  User Defined Function 
--
 
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnwrapUDT.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnwrapUDT.scala
new file mode 100644
index 000..cb740672af3
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnwrapUDT.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
+import org.apache.spar

[spark] branch master updated: [SPARK-36642][SQL] Add df.withMetadata pyspark API

2021-09-23 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ef7441b  [SPARK-36642][SQL] Add df.withMetadata pyspark API
ef7441b is described below

commit ef7441bb4245e2fb08d70a8025a2c6d8bebfe75b
Author: Liang Zhang 
AuthorDate: Fri Sep 24 08:30:18 2021 +0800

[SPARK-36642][SQL] Add df.withMetadata pyspark API

This PR adds the pyspark API `df.withMetadata(columnName, metadata)`. The 
scala API is added in this PR https://github.com/apache/spark/pull/33853.

### What changes were proposed in this pull request?

To make it easy to use/modify the semantic annotation, we want to have a 
shorter API to update the metadata in a dataframe. Currently we have 
`df.withColumn("col1", col("col1").alias("col1", metadata=metadata))` to update 
the metadata without changing the column name, and this is too verbose. We want 
to have a syntax suger API `df.withMetadata("col1", metadata=metadata)` to 
achieve the same functionality.

### Why are the changes needed?

A bit of background for the frequency of the update: We are working on 
inferring the semantic data types and use them in AutoML and store the semantic 
annotation in the metadata. So in many cases, we will suggest the user update 
the metadata to correct the wrong inference or add the annotation for weak 
inference.

### Does this PR introduce _any_ user-facing change?

Yes.
A syntax suger API `df.withMetadata("col1", metadata=metadata)` to achieve 
the same functionality as`df.withColumn("col1", col("col1").alias("col1", 
metadata=metadata))`.

### How was this patch tested?

doctest.

Closes #34021 from liangz1/withMetadataPython.

Authored-by: Liang Zhang 
Signed-off-by: Weichen Xu 
---
 python/pyspark/sql/dataframe.py | 27 +++
 1 file changed, 27 insertions(+)

diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 081c06e..5a2e8cf 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 #
 
+import json
 import sys
 import random
 import warnings
@@ -23,6 +24,7 @@ from functools import reduce
 from html import escape as html_escape
 
 from pyspark import copy_func, since, _NoValue
+from pyspark.context import SparkContext
 from pyspark.rdd import RDD, _load_from_socket, _local_iterator_from_socket
 from pyspark.serializers import BatchedSerializer, PickleSerializer, \
 UTF8Deserializer
@@ -2536,6 +2538,31 @@ class DataFrame(PandasMapOpsMixin, 
PandasConversionMixin):
 """
 return DataFrame(self._jdf.withColumnRenamed(existing, new), 
self.sql_ctx)
 
+def withMetadata(self, columnName, metadata):
+"""Returns a new :class:`DataFrame` by updating an existing column 
with metadata.
+
+.. versionadded:: 3.3.0
+
+Parameters
+--
+columnName : str
+string, name of the existing column to update the metadata.
+metadata : dict
+dict, new metadata to be assigned to df.schema[columnName].metadata
+
+Examples
+
+>>> df_meta = df.withMetadata('age', {'foo': 'bar'})
+>>> df_meta.schema['age'].metadata
+{'foo': 'bar'}
+"""
+if not isinstance(metadata, dict):
+raise TypeError("metadata should be a dict")
+sc = SparkContext._active_spark_context
+jmeta = sc._jvm.org.apache.spark.sql.types.Metadata.fromJson(
+json.dumps(metadata))
+return DataFrame(self._jdf.withMetadata(columnName, jmeta), 
self.sql_ctx)
+
 def drop(self, *cols):
 """Returns a new :class:`DataFrame` that drops the specified column.
 This is a no-op if schema doesn't contain the given column name(s).

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-36642][SQL] Add df.withMetadata: a syntax suger to update the metadata of a dataframe

2021-09-07 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new cb30683  [SPARK-36642][SQL] Add df.withMetadata: a syntax suger to 
update the metadata of a dataframe
cb30683 is described below

commit cb30683b65714bd1c1ae15d31d1411aeb66cef12
Author: Liang Zhang 
AuthorDate: Wed Sep 8 09:35:18 2021 +0800

[SPARK-36642][SQL] Add df.withMetadata: a syntax suger to update the 
metadata of a dataframe

### What changes were proposed in this pull request?

To make it easy to use/modify the semantic annotation, we want to have a 
shorter API to update the metadata in a dataframe. Currently we have 
`df.withColumn("col1", col("col1").alias("col1", metadata=metadata))` to update 
the metadata without changing the column name, and this is too verbose. We want 
to have a syntax suger API `df.withMetadata("col1", metadata=metadata)` to 
achieve the same functionality.

### Why are the changes needed?

A bit of background for the frequency of the update: We are working on 
inferring the semantic data types and use them in AutoML and store the semantic 
annotation in the metadata. So in many cases, we will suggest the user update 
the metadata to correct the wrong inference or add the annotation for weak 
inference.

### Does this PR introduce _any_ user-facing change?

Yes.
A syntax suger API `df.withMetadata("col1", metadata=metadata)` to achieve 
the same functionality as`df.withColumn("col1", col("col1").alias("col1", 
metadata=metadata))`.

### How was this patch tested?

A unit test in DataFrameSuite.scala.

Closes #33853 from liangz1/withMetadata.

Authored-by: Liang Zhang 
Signed-off-by: Weichen Xu 
---
 sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala   | 10 ++
 .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 12 
 2 files changed, 22 insertions(+)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index bd3411d..1e85551 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2492,6 +2492,16 @@ class Dataset[T] private[sql](
   }
 
   /**
+   * Returns a new Dataset by updating an existing column with metadata.
+   *
+   * @group untypedrel
+   * @since 3.3.0
+   */
+  def withMetadata(columnName: String, metadata: Metadata): DataFrame = {
+withColumn(columnName, col(columnName), metadata)
+  }
+
+  /**
* Returns a new Dataset with a column dropped. This is a no-op if schema 
doesn't contain
* column name.
*
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index bf161a1..54fb90a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -702,6 +702,18 @@ class DataFrameSuite extends QueryTest
   "The size of column names: 2 isn't equal to the size of metadata 
elements: 1"))
   }
 
+  test("SPARK-36642: withMetadata: replace metadata of a column") {
+val metadata = new MetadataBuilder().putLong("key", 1L).build()
+val df1 = sparkContext.parallelize(Array(1, 2, 3)).toDF("x")
+val df2 = df1.withMetadata("x", metadata)
+assert(df2.schema(0).metadata === metadata)
+
+val err = intercept[AnalysisException] {
+  df1.withMetadata("x1", metadata)
+}
+assert(err.getMessage.contains("Cannot resolve column name"))
+  }
+
   test("replace column using withColumn") {
 val df2 = sparkContext.parallelize(Array(1, 2, 3)).toDF("x")
 val df3 = df2.withColumn("x", df2("x") + 1)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.1 updated: [SPARK-35142][PYTHON][ML] Fix incorrect return type for `rawPredictionUDF` in `OneVsRestModel`

2021-04-21 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new 0208810  [SPARK-35142][PYTHON][ML] Fix incorrect return type for 
`rawPredictionUDF` in `OneVsRestModel`
0208810 is described below

commit 0208810b93e234b822bc972f4236bf04bf521e7d
Author: harupy <17039389+har...@users.noreply.github.com>
AuthorDate: Wed Apr 21 16:29:10 2021 +0800

[SPARK-35142][PYTHON][ML] Fix incorrect return type for `rawPredictionUDF` 
in `OneVsRestModel`

### What changes were proposed in this pull request?

Fixes incorrect return type for `rawPredictionUDF` in `OneVsRestModel`.

### Why are the changes needed?
Bugfix

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test.

Closes #32245 from harupy/SPARK-35142.

Authored-by: harupy <17039389+har...@users.noreply.github.com>
Signed-off-by: Weichen Xu 
(cherry picked from commit b6350f5bb00f99a060953850b069a419b70c329e)
Signed-off-by: Weichen Xu 
---
 python/pyspark/ml/classification.py|  4 ++--
 python/pyspark/ml/tests/test_algorithms.py | 14 +-
 2 files changed, 15 insertions(+), 3 deletions(-)

diff --git a/python/pyspark/ml/classification.py 
b/python/pyspark/ml/classification.py
index 0553a61..17994ed 100644
--- a/python/pyspark/ml/classification.py
+++ b/python/pyspark/ml/classification.py
@@ -40,7 +40,7 @@ from pyspark.ml.util import DefaultParamsReader, 
DefaultParamsWriter, \
 from pyspark.ml.wrapper import JavaParams, \
 JavaPredictor, JavaPredictionModel, JavaWrapper
 from pyspark.ml.common import inherit_doc
-from pyspark.ml.linalg import Vectors
+from pyspark.ml.linalg import Vectors, VectorUDT
 from pyspark.sql import DataFrame
 from pyspark.sql.functions import udf, when
 from pyspark.sql.types import ArrayType, DoubleType
@@ -3151,7 +3151,7 @@ class OneVsRestModel(Model, _OneVsRestParams, MLReadable, 
MLWritable):
 predArray.append(x)
 return Vectors.dense(predArray)
 
-rawPredictionUDF = udf(func)
+rawPredictionUDF = udf(func, VectorUDT())
 aggregatedDataset = aggregatedDataset.withColumn(
 self.getRawPredictionCol(), 
rawPredictionUDF(aggregatedDataset[accColName]))
 
diff --git a/python/pyspark/ml/tests/test_algorithms.py 
b/python/pyspark/ml/tests/test_algorithms.py
index 5047521..35ce48b 100644
--- a/python/pyspark/ml/tests/test_algorithms.py
+++ b/python/pyspark/ml/tests/test_algorithms.py
@@ -25,7 +25,7 @@ from pyspark.ml.classification import FMClassifier, 
LogisticRegression, \
 MultilayerPerceptronClassifier, OneVsRest
 from pyspark.ml.clustering import DistributedLDAModel, KMeans, LocalLDAModel, 
LDA, LDAModel
 from pyspark.ml.fpm import FPGrowth
-from pyspark.ml.linalg import Matrices, Vectors
+from pyspark.ml.linalg import Matrices, Vectors, DenseVector
 from pyspark.ml.recommendation import ALS
 from pyspark.ml.regression import GeneralizedLinearRegression, LinearRegression
 from pyspark.sql import Row
@@ -116,6 +116,18 @@ class OneVsRestTests(SparkSessionTestCase):
 output = model.transform(df)
 self.assertEqual(output.columns, ["label", "features", 
"rawPrediction", "prediction"])
 
+def test_raw_prediction_column_is_of_vector_type(self):
+# SPARK-35142: `OneVsRestModel` outputs raw prediction as a string 
column
+df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
+ (1.0, Vectors.sparse(2, [], [])),
+ (2.0, Vectors.dense(0.5, 0.5))],
+["label", "features"])
+lr = LogisticRegression(maxIter=5, regParam=0.01)
+ovr = OneVsRest(classifier=lr, parallelism=1)
+model = ovr.fit(df)
+row = model.transform(df).head()
+self.assertIsInstance(row["rawPrediction"], DenseVector)
+
 def test_parallelism_does_not_change_output(self):
 df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
  (1.0, Vectors.sparse(2, [], [])),

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (43ad939 -> b6350f5)

2021-04-21 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 43ad939  [SPARK-35152][SQL] ANSI mode: IntegralDivide throws exception 
on overflow
 add b6350f5  [SPARK-35142][PYTHON][ML] Fix incorrect return type for 
`rawPredictionUDF` in `OneVsRestModel`

No new revisions were added by this update.

Summary of changes:
 python/pyspark/ml/classification.py|  4 ++--
 python/pyspark/ml/tests/test_algorithms.py | 14 +-
 2 files changed, 15 insertions(+), 3 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.1 updated: [SPARK-31768][ML][FOLLOWUP] add getMetrics in Evaluators: cleanup

2021-01-25 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new 4cf94f3  [SPARK-31768][ML][FOLLOWUP] add getMetrics in Evaluators: 
cleanup
4cf94f3 is described below

commit 4cf94f3e33de968b0048690cb0128437736d60a7
Author: Ruifeng Zheng 
AuthorDate: Tue Jan 26 11:57:28 2021 +0800

[SPARK-31768][ML][FOLLOWUP] add getMetrics in Evaluators: cleanup

### What changes were proposed in this pull request?
1, make `silhouette` a method;
2, change return type of `setDistanceMeasure` to `this.type`;

### Why are the changes needed?
see comments in https://github.com/apache/spark/pull/28590

### Does this PR introduce _any_ user-facing change?
No, 3.1 has not been released

### How was this patch tested?
existing testsuites

Closes #31334 from zhengruifeng/31768-followup.

Authored-by: Ruifeng Zheng 
Signed-off-by: Weichen Xu 
(cherry picked from commit cb37c962bec25083a67d65387ed88e7d4ee556ca)
Signed-off-by: Weichen Xu 
---
 .../scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala | 9 +++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala 
b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala
index 3dea244..3035688 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala
@@ -37,13 +37,18 @@ class ClusteringMetrics private[spark](dataset: Dataset[_]) 
{
 
   def getDistanceMeasure: String = distanceMeasure
 
-  def setDistanceMeasure(value: String) : Unit = distanceMeasure = value
+  def setDistanceMeasure(value: String) : this.type = {
+require(value.equalsIgnoreCase("squaredEuclidean") ||
+  value.equalsIgnoreCase("cosine"))
+distanceMeasure = value
+this
+  }
 
   /**
* Returns the silhouette score
*/
   @Since("3.1.0")
-  lazy val silhouette: Double = {
+  def silhouette(): Double = {
 val columns = dataset.columns.toSeq
 if (distanceMeasure.equalsIgnoreCase("squaredEuclidean")) {
   SquaredEuclideanSilhouette.computeSilhouetteScore(


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (d1177b5 -> cb37c96)

2021-01-25 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from d1177b5  [SPARK-34192][SQL] Move char padding to write side and remove 
length check on read side too
 add cb37c96  [SPARK-31768][ML][FOLLOWUP] add getMetrics in Evaluators: 
cleanup

No new revisions were added by this update.

Summary of changes:
 .../scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala | 9 +++--
 1 file changed, 7 insertions(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [MINOR][ML] Increase the timeout for StreamingLinearRegressionSuite to 60s

2021-01-19 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 5a93bcb  [MINOR][ML] Increase the timeout for 
StreamingLinearRegressionSuite to 60s
5a93bcb is described below

commit 5a93bcbcc27d7d2f1e816e466478f67d4e3df105
Author: Liang Zhang 
AuthorDate: Wed Jan 20 08:26:52 2021 +0800

[MINOR][ML] Increase the timeout for StreamingLinearRegressionSuite to 60s

### What changes were proposed in this pull request?

Increase the timeout for StreamingLinearRegressionSuite to 60s to deflake 
the test.

### Why are the changes needed?

Reduce merge conflict.

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #31248 from liangz1/increase-timeout.

Authored-by: Liang Zhang 
Signed-off-by: Weichen Xu 
(cherry picked from commit f7ff7ff0a5a251d7983d933dbe8c20882356e5bf)
Signed-off-by: Weichen Xu 
---
 .../apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala
index 8e2d7d1..b8342f8 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala
@@ -31,7 +31,7 @@ class StreamingLinearRegressionSuite
   with TestSuiteBase {
 
   // use longer wait time to ensure job completion
-  override def maxWaitTimeMillis: Int = 2
+  override def maxWaitTimeMillis: Int = 6
 
   // Assert that two values are equal within tolerance epsilon
   def assertEqual(v1: Double, v2: Double, epsilon: Double): Unit = {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.1 updated: [MINOR][ML] Increase the timeout for StreamingLinearRegressionSuite to 60s

2021-01-19 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new 32e61b0  [MINOR][ML] Increase the timeout for 
StreamingLinearRegressionSuite to 60s
32e61b0 is described below

commit 32e61b0a551efac0c1a471a9f8c307e2dec502db
Author: Liang Zhang 
AuthorDate: Wed Jan 20 08:26:52 2021 +0800

[MINOR][ML] Increase the timeout for StreamingLinearRegressionSuite to 60s

### What changes were proposed in this pull request?

Increase the timeout for StreamingLinearRegressionSuite to 60s to deflake 
the test.

### Why are the changes needed?

Reduce merge conflict.

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #31248 from liangz1/increase-timeout.

Authored-by: Liang Zhang 
Signed-off-by: Weichen Xu 
(cherry picked from commit f7ff7ff0a5a251d7983d933dbe8c20882356e5bf)
Signed-off-by: Weichen Xu 
---
 .../apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala
index 8e2d7d1..b8342f8 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala
@@ -31,7 +31,7 @@ class StreamingLinearRegressionSuite
   with TestSuiteBase {
 
   // use longer wait time to ensure job completion
-  override def maxWaitTimeMillis: Int = 2
+  override def maxWaitTimeMillis: Int = 6
 
   // Assert that two values are equal within tolerance epsilon
   def assertEqual(v1: Double, v2: Double, epsilon: Double): Unit = {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (32dad1d -> f7ff7ff)

2021-01-19 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 32dad1d  [SPARK-34149][SQL] Refresh cache in v2 `ALTER TABLE .. ADD 
PARTITION`
 add f7ff7ff  [MINOR][ML] Increase the timeout for 
StreamingLinearRegressionSuite to 60s

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (66cc129 -> f354883)

2021-01-15 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 66cc129  [SPARK-34132][DOCS][R] Update Roxygen version references to 
7.1.1
 add f354883  [SPARK-34080][ML][PYTHON] Add UnivariateFeatureSelector

No new revisions were added by this update.

Summary of changes:
 docs/ml-features.md| 111 +---
 docs/ml-statistics.md  |  54 +-
 .../spark/examples/ml/JavaANOVATestExample.java|  75 ---
 .../examples/ml/JavaFValueSelectorExample.java |  81 ---
 .../spark/examples/ml/JavaFValueTestExample.java   |  75 ---
 ...a => JavaUnivariateFeatureSelectorExample.java} |  21 +-
 examples/src/main/python/ml/anova_test_example.py  |  50 --
 .../src/main/python/ml/fvalue_selector_example.py  |  53 --
 examples/src/main/python/ml/fvalue_test_example.py |  50 --
 ...e.py => univariate_feature_selector_example.py} |  16 +-
 .../spark/examples/ml/ANOVATestExample.scala   |  63 --
 .../spark/examples/ml/FValueSelectorExample.scala  |  69 ---
 .../spark/examples/ml/FValueTestExample.scala  |  63 --
 ...cala => UnivariateFeatureSelectorExample.scala} |  20 +-
 .../apache/spark/ml/feature/ANOVASelector.scala| 195 --
 .../apache/spark/ml/feature/ChiSqSelector.scala|   1 +
 .../apache/spark/ml/feature/FValueSelector.scala   | 195 --
 .../org/apache/spark/ml/feature/Selector.scala |  12 +-
 .../ml/feature/UnivariateFeatureSelector.scala | 467 ++
 .../scala/org/apache/spark/ml/stat/ANOVATest.scala |   2 +-
 .../org/apache/spark/ml/stat/FValueTest.scala  |   2 +-
 .../spark/ml/feature/ANOVASelectorSuite.scala  | 206 ---
 .../spark/ml/feature/FValueSelectorSuite.scala | 238 ---
 .../feature/UnivariateFeatureSelectorSuite.scala   | 685 +
 python/docs/source/reference/pyspark.ml.rst|   8 +-
 python/pyspark/ml/feature.py   | 449 +++---
 python/pyspark/ml/feature.pyi  | 116 ++--
 python/pyspark/ml/stat.py  | 148 -
 python/pyspark/ml/stat.pyi |  12 -
 29 files changed, 1512 insertions(+), 2025 deletions(-)
 delete mode 100644 
examples/src/main/java/org/apache/spark/examples/ml/JavaANOVATestExample.java
 delete mode 100644 
examples/src/main/java/org/apache/spark/examples/ml/JavaFValueSelectorExample.java
 delete mode 100644 
examples/src/main/java/org/apache/spark/examples/ml/JavaFValueTestExample.java
 rename 
examples/src/main/java/org/apache/spark/examples/ml/{JavaANOVASelectorExample.java
 => JavaUnivariateFeatureSelectorExample.java} (79%)
 delete mode 100644 examples/src/main/python/ml/anova_test_example.py
 delete mode 100644 examples/src/main/python/ml/fvalue_selector_example.py
 delete mode 100644 examples/src/main/python/ml/fvalue_test_example.py
 rename examples/src/main/python/ml/{anova_selector_example.py => 
univariate_feature_selector_example.py} (70%)
 delete mode 100644 
examples/src/main/scala/org/apache/spark/examples/ml/ANOVATestExample.scala
 delete mode 100644 
examples/src/main/scala/org/apache/spark/examples/ml/FValueSelectorExample.scala
 delete mode 100644 
examples/src/main/scala/org/apache/spark/examples/ml/FValueTestExample.scala
 rename 
examples/src/main/scala/org/apache/spark/examples/ml/{ANOVASelectorExample.scala
 => UnivariateFeatureSelectorExample.scala} (76%)
 delete mode 100644 
mllib/src/main/scala/org/apache/spark/ml/feature/ANOVASelector.scala
 delete mode 100644 
mllib/src/main/scala/org/apache/spark/ml/feature/FValueSelector.scala
 create mode 100644 
mllib/src/main/scala/org/apache/spark/ml/feature/UnivariateFeatureSelector.scala
 delete mode 100644 
mllib/src/test/scala/org/apache/spark/ml/feature/ANOVASelectorSuite.scala
 delete mode 100644 
mllib/src/test/scala/org/apache/spark/ml/feature/FValueSelectorSuite.scala
 create mode 100644 
mllib/src/test/scala/org/apache/spark/ml/feature/UnivariateFeatureSelectorSuite.scala


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.1 updated: [MINOR][ML] Increase Bounded MLOR (without regularization) test error tolerance

2020-12-08 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new b0a70ab  [MINOR][ML] Increase Bounded MLOR (without regularization) 
test error tolerance
b0a70ab is described below

commit b0a70abed383cedd59f28cec75aa898df0c0b4bd
Author: Weichen Xu 
AuthorDate: Wed Dec 9 11:18:09 2020 +0800

[MINOR][ML] Increase Bounded MLOR (without regularization) test error 
tolerance

### What changes were proposed in this pull request?
Improve LogisticRegression test error tolerance

### Why are the changes needed?
When we switch BLAS version, some of the tests will fail due to too strict 
error tolerance in test.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
N/A

Closes #30587 from WeichenXu123/fix_lor_test.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
(cherry picked from commit f021f6d3c72e1c84637798b4ddcb7e208fdfbf46)
Signed-off-by: Weichen Xu 
---
 .../ml/classification/LogisticRegressionSuite.scala | 21 +++--
 1 file changed, 11 insertions(+), 10 deletions(-)

diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
index d0b282d..d2814b4 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
@@ -1548,9 +1548,9 @@ class LogisticRegressionSuite extends MLTest with 
DefaultReadWriteTest {
 val interceptsExpected1 = Vectors.dense(
   1.152482448372, 3.591773288423673, 5.079685953744937)
 
-checkCoefficientsEquivalent(model1.coefficientMatrix, 
coefficientsExpected1)
+checkBoundedMLORCoefficientsEquivalent(model1.coefficientMatrix, 
coefficientsExpected1)
 assert(model1.interceptVector ~== interceptsExpected1 relTol 0.01)
-checkCoefficientsEquivalent(model2.coefficientMatrix, 
coefficientsExpected1)
+checkBoundedMLORCoefficientsEquivalent(model2.coefficientMatrix, 
coefficientsExpected1)
 assert(model2.interceptVector ~== interceptsExpected1 relTol 0.01)
 
 // Bound constrained optimization with bound on both side.
@@ -1585,9 +1585,9 @@ class LogisticRegressionSuite extends MLTest with 
DefaultReadWriteTest {
   isTransposed = true)
 val interceptsExpected3 = Vectors.dense(1.0, 2.0, 2.0)
 
-checkCoefficientsEquivalent(model3.coefficientMatrix, 
coefficientsExpected3)
+checkBoundedMLORCoefficientsEquivalent(model3.coefficientMatrix, 
coefficientsExpected3)
 assert(model3.interceptVector ~== interceptsExpected3 relTol 0.01)
-checkCoefficientsEquivalent(model4.coefficientMatrix, 
coefficientsExpected3)
+checkBoundedMLORCoefficientsEquivalent(model4.coefficientMatrix, 
coefficientsExpected3)
 assert(model4.interceptVector ~== interceptsExpected3 relTol 0.01)
 
 // Bound constrained optimization with infinite bound on both side.
@@ -1621,9 +1621,9 @@ class LogisticRegressionSuite extends MLTest with 
DefaultReadWriteTest {
 val interceptsExpected5 = Vectors.dense(
   -2.2231282183460723, 0.3669496747012527, 1.856178543644802)
 
-checkCoefficientsEquivalent(model5.coefficientMatrix, 
coefficientsExpected5)
+checkBoundedMLORCoefficientsEquivalent(model5.coefficientMatrix, 
coefficientsExpected5)
 assert(model5.interceptVector ~== interceptsExpected5 relTol 0.01)
-checkCoefficientsEquivalent(model6.coefficientMatrix, 
coefficientsExpected5)
+checkBoundedMLORCoefficientsEquivalent(model6.coefficientMatrix, 
coefficientsExpected5)
 assert(model6.interceptVector ~== interceptsExpected5 relTol 0.01)
   }
 
@@ -1719,9 +1719,9 @@ class LogisticRegressionSuite extends MLTest with 
DefaultReadWriteTest {
   1.7524631428961193, 1.2292565990448736, 1.3433784431904323, 
1.5846063017678864),
   isTransposed = true)
 
-checkCoefficientsEquivalent(model1.coefficientMatrix, coefficientsExpected)
+checkBoundedMLORCoefficientsEquivalent(model1.coefficientMatrix, 
coefficientsExpected)
 assert(model1.interceptVector.toArray === Array.fill(3)(0.0))
-checkCoefficientsEquivalent(model2.coefficientMatrix, coefficientsExpected)
+checkBoundedMLORCoefficientsEquivalent(model2.coefficientMatrix, 
coefficientsExpected)
 assert(model2.interceptVector.toArray === Array.fill(3)(0.0))
   }
 
@@ -2953,16 +2953,17 @@ object LogisticRegressionSuite {
   }
 
   /**
+   * Note: This method is only used in Bounded MLOR (without regularization) 
test
* When no regularization is applied, the multinomial coefficients lack 
identifiability
* because we do not use a pivot class. We can add any constant value

[spark] branch master updated (3ac70f1 -> f021f6d)

2020-12-08 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 3ac70f1  [SPARK-33695][BUILD] Upgrade to jackson to 2.10.5 and 
jackson-databind to 2.10.5.1
 add f021f6d  [MINOR][ML] Increase Bounded MLOR (without regularization) 
test error tolerance

No new revisions were added by this update.

Summary of changes:
 .../ml/classification/LogisticRegressionSuite.scala | 21 +++--
 1 file changed, 11 insertions(+), 10 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-33592][ML][PYTHON][3.0] Backport Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading

2020-12-06 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 8acbe5b  [SPARK-33592][ML][PYTHON][3.0] Backport Fix: Pyspark ML 
Validator params in estimatorParamMaps may be lost after saving and reloading
8acbe5b is described below

commit 8acbe5b822b7b0ad49079aa223ad52afe70b5afa
Author: Weichen Xu 
AuthorDate: Mon Dec 7 11:42:18 2020 +0800

[SPARK-33592][ML][PYTHON][3.0] Backport Fix: Pyspark ML Validator params in 
estimatorParamMaps may be lost after saving and reloading

Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after 
saving and reloading

When saving validator estimatorParamMaps, will check all nested stages in 
tuned estimator to get correct param parent.

Two typical cases to manually test:
~~~python
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), 
outputCol="features")
lr = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100]) \
.addGrid(lr.maxIter, [100, 200]) \
.build()
tvs = TrainValidationSplit(estimator=pipeline,
   estimatorParamMaps=paramGrid,
   evaluator=MulticlassClassificationEvaluator())

tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)

~~~

~~~python
lr = LogisticRegression()
ova = OneVsRest(classifier=lr)
grid = ParamGridBuilder().addGrid(lr.maxIter, [100, 200]).build()
evaluator = MulticlassClassificationEvaluator()
tvs = TrainValidationSplit(estimator=ova, estimatorParamMaps=grid, 
evaluator=evaluator)

tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)

~~~

Bug fix.

No

Unit test.

Closes #30539 from WeichenXu123/fix_tuning_param_maps_io.

Authored-by: Weichen Xu 
Signed-off-by: Ruifeng Zheng 
(cherry picked from commit 80161238fe9393aabd5fcd56752ff1e43f6989b1)
Signed-off-by: Weichen Xu 

### What changes were proposed in this pull request?

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #30590 from WeichenXu123/SPARK-33592-bp-3.0.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
---
 dev/sparktestsupport/modules.py|  1 +
 python/pyspark/ml/classification.py| 48 +-
 python/pyspark/ml/param/__init__.py|  6 +++
 python/pyspark/ml/pipeline.py  | 53 +---
 python/pyspark/ml/tests/test_tuning.py | 47 +++--
 python/pyspark/ml/tests/test_util.py   | 84 +++
 python/pyspark/ml/tuning.py| 92 +++---
 python/pyspark/ml/util.py  | 38 ++
 8 files changed, 261 insertions(+), 108 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 75bdec0..8705d52 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -548,6 +548,7 @@ pyspark_ml = Module(
 "pyspark.ml.tests.test_stat",
 "pyspark.ml.tests.test_training_summary",
 "pyspark.ml.tests.test_tuning",
+"pyspark.ml.tests.test_util",
 "pyspark.ml.tests.test_wrapper",
 ],
 blacklisted_python_implementations=[
diff --git a/python/pyspark/ml/classification.py 
b/python/pyspark/ml/classification.py
index c8e15ca..1392bc7 100644
--- a/python/pyspark/ml/classification.py
+++ b/python/pyspark/ml/classification.py
@@ -27,9 +27,9 @@ from pyspark.ml.tree import _DecisionTreeModel, 
_DecisionTreeParams, \
 _HasVarianceImpurity, _TreeClassifierParams, _TreeEnsembleParams
 from pyspark.ml.regression import _FactorizationMachinesParams, 
DecisionTreeRegressionModel
 from pyspark.ml.util import *
-from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams, \
+from pyspark.ml.wrapper import JavaParams, \
 JavaPredictor, _JavaPredictorParams, JavaPredictionModel, JavaWrapper
-from pyspark.ml.common import inherit_doc, _java2py, _py2java
+from pyspark.ml.common import inherit_doc
 from pyspark.ml.linalg import Vectors
 from pyspark.sql import DataFrame
 from pyspark.sql.functions import udf, when
@@ -2635,50 +2635,6 @@ class OneVsRest(Estimator, _OneVsRestParams, 
HasParallelism, JavaMLReadable, Jav
 _java_obj.setRawPredictionCol(self.getRawPredictionCol())
 return _java_obj
 
-def _make_java_param_pair(self, param, value):
-"""
-Makes 

[spark] branch master updated (63f9d47 -> 7e759b2)

2020-12-03 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 63f9d47  [SPARK-33634][SQL][TESTS] Use Analyzer in PlanResolutionSuite
 add 7e759b2  [SPARK-33520][ML][PYSPARK] make 
CrossValidator/TrainValidateSplit/OneVsRest Reader/Writer support Python 
backend estimator/evaluator

No new revisions were added by this update.

Summary of changes:
 python/pyspark/ml/classification.py | 128 +-
 python/pyspark/ml/classification.pyi|  31 ++-
 python/pyspark/ml/tests/test_persistence.py |  14 +-
 python/pyspark/ml/tests/test_tuning.py  |  97 +---
 python/pyspark/ml/tuning.py | 357 ++--
 python/pyspark/ml/tuning.pyi|  40 
 python/pyspark/ml/util.py   |  42 +++-
 python/pyspark/ml/util.pyi  |   2 +
 python/pyspark/testing/mlutils.py   |  87 +++
 9 files changed, 739 insertions(+), 59 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (a180e02 -> 689c294)

2020-11-18 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from a180e02  [SPARK-32852][SQL][DOC][FOLLOWUP] Revise the documentation of 
spark.sql.hive.metastore.jars
 add 689c294  [SPARK-32907][ML][PYTHON] Adaptively blockify instances - 
AFT,LiR,LoR

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/ml/classification/LinearSVC.scala |   6 +-
 .../ml/classification/LogisticRegression.scala | 114 +
 .../spark/ml/optim/aggregator/AFTAggregator.scala  |  42 +++
 .../ml/optim/aggregator/HingeAggregator.scala  |   6 +-
 .../ml/optim/aggregator/HuberAggregator.scala  |  11 +-
 .../optim/aggregator/LeastSquaresAggregator.scala  |  11 +-
 .../ml/optim/aggregator/LogisticAggregator.scala   |  19 +--
 .../ml/param/shared/SharedParamsCodeGen.scala  |   4 +-
 .../spark/ml/param/shared/sharedParams.scala   |   4 +-
 .../ml/regression/AFTSurvivalRegression.scala  | 130 +++
 .../spark/ml/regression/LinearRegression.scala | 138 ++---
 .../mllib/classification/LogisticRegression.scala  |   4 +-
 .../classification/LogisticRegressionSuite.scala   |   8 +-
 .../ml/regression/AFTSurvivalRegressionSuite.scala |   4 +-
 .../ml/regression/LinearRegressionSuite.scala  |   4 +-
 python/pyspark/ml/classification.py|  22 ++--
 python/pyspark/ml/classification.pyi   |   8 +-
 python/pyspark/ml/param/_shared_params_code_gen.py |   4 +-
 python/pyspark/ml/param/shared.py  |   4 +-
 python/pyspark/ml/regression.py|  46 +++
 python/pyspark/ml/regression.pyi   |  18 +--
 21 files changed, 237 insertions(+), 370 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (a180e02 -> 689c294)

2020-11-18 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from a180e02  [SPARK-32852][SQL][DOC][FOLLOWUP] Revise the documentation of 
spark.sql.hive.metastore.jars
 add 689c294  [SPARK-32907][ML][PYTHON] Adaptively blockify instances - 
AFT,LiR,LoR

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/ml/classification/LinearSVC.scala |   6 +-
 .../ml/classification/LogisticRegression.scala | 114 +
 .../spark/ml/optim/aggregator/AFTAggregator.scala  |  42 +++
 .../ml/optim/aggregator/HingeAggregator.scala  |   6 +-
 .../ml/optim/aggregator/HuberAggregator.scala  |  11 +-
 .../optim/aggregator/LeastSquaresAggregator.scala  |  11 +-
 .../ml/optim/aggregator/LogisticAggregator.scala   |  19 +--
 .../ml/param/shared/SharedParamsCodeGen.scala  |   4 +-
 .../spark/ml/param/shared/sharedParams.scala   |   4 +-
 .../ml/regression/AFTSurvivalRegression.scala  | 130 +++
 .../spark/ml/regression/LinearRegression.scala | 138 ++---
 .../mllib/classification/LogisticRegression.scala  |   4 +-
 .../classification/LogisticRegressionSuite.scala   |   8 +-
 .../ml/regression/AFTSurvivalRegressionSuite.scala |   4 +-
 .../ml/regression/LinearRegressionSuite.scala  |   4 +-
 python/pyspark/ml/classification.py|  22 ++--
 python/pyspark/ml/classification.pyi   |   8 +-
 python/pyspark/ml/param/_shared_params_code_gen.py |   4 +-
 python/pyspark/ml/param/shared.py  |   4 +-
 python/pyspark/ml/regression.py|  46 +++
 python/pyspark/ml/regression.pyi   |  18 +--
 21 files changed, 237 insertions(+), 370 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (a180e02 -> 689c294)

2020-11-18 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from a180e02  [SPARK-32852][SQL][DOC][FOLLOWUP] Revise the documentation of 
spark.sql.hive.metastore.jars
 add 689c294  [SPARK-32907][ML][PYTHON] Adaptively blockify instances - 
AFT,LiR,LoR

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/ml/classification/LinearSVC.scala |   6 +-
 .../ml/classification/LogisticRegression.scala | 114 +
 .../spark/ml/optim/aggregator/AFTAggregator.scala  |  42 +++
 .../ml/optim/aggregator/HingeAggregator.scala  |   6 +-
 .../ml/optim/aggregator/HuberAggregator.scala  |  11 +-
 .../optim/aggregator/LeastSquaresAggregator.scala  |  11 +-
 .../ml/optim/aggregator/LogisticAggregator.scala   |  19 +--
 .../ml/param/shared/SharedParamsCodeGen.scala  |   4 +-
 .../spark/ml/param/shared/sharedParams.scala   |   4 +-
 .../ml/regression/AFTSurvivalRegression.scala  | 130 +++
 .../spark/ml/regression/LinearRegression.scala | 138 ++---
 .../mllib/classification/LogisticRegression.scala  |   4 +-
 .../classification/LogisticRegressionSuite.scala   |   8 +-
 .../ml/regression/AFTSurvivalRegressionSuite.scala |   4 +-
 .../ml/regression/LinearRegressionSuite.scala  |   4 +-
 python/pyspark/ml/classification.py|  22 ++--
 python/pyspark/ml/classification.pyi   |   8 +-
 python/pyspark/ml/param/_shared_params_code_gen.py |   4 +-
 python/pyspark/ml/param/shared.py  |   4 +-
 python/pyspark/ml/regression.py|  46 +++
 python/pyspark/ml/regression.pyi   |  18 +--
 21 files changed, 237 insertions(+), 370 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (a180e02 -> 689c294)

2020-11-18 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from a180e02  [SPARK-32852][SQL][DOC][FOLLOWUP] Revise the documentation of 
spark.sql.hive.metastore.jars
 add 689c294  [SPARK-32907][ML][PYTHON] Adaptively blockify instances - 
AFT,LiR,LoR

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/ml/classification/LinearSVC.scala |   6 +-
 .../ml/classification/LogisticRegression.scala | 114 +
 .../spark/ml/optim/aggregator/AFTAggregator.scala  |  42 +++
 .../ml/optim/aggregator/HingeAggregator.scala  |   6 +-
 .../ml/optim/aggregator/HuberAggregator.scala  |  11 +-
 .../optim/aggregator/LeastSquaresAggregator.scala  |  11 +-
 .../ml/optim/aggregator/LogisticAggregator.scala   |  19 +--
 .../ml/param/shared/SharedParamsCodeGen.scala  |   4 +-
 .../spark/ml/param/shared/sharedParams.scala   |   4 +-
 .../ml/regression/AFTSurvivalRegression.scala  | 130 +++
 .../spark/ml/regression/LinearRegression.scala | 138 ++---
 .../mllib/classification/LogisticRegression.scala  |   4 +-
 .../classification/LogisticRegressionSuite.scala   |   8 +-
 .../ml/regression/AFTSurvivalRegressionSuite.scala |   4 +-
 .../ml/regression/LinearRegressionSuite.scala  |   4 +-
 python/pyspark/ml/classification.py|  22 ++--
 python/pyspark/ml/classification.pyi   |   8 +-
 python/pyspark/ml/param/_shared_params_code_gen.py |   4 +-
 python/pyspark/ml/param/shared.py  |   4 +-
 python/pyspark/ml/regression.py|  46 +++
 python/pyspark/ml/regression.pyi   |  18 +--
 21 files changed, 237 insertions(+), 370 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (a180e02 -> 689c294)

2020-11-18 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from a180e02  [SPARK-32852][SQL][DOC][FOLLOWUP] Revise the documentation of 
spark.sql.hive.metastore.jars
 add 689c294  [SPARK-32907][ML][PYTHON] Adaptively blockify instances - 
AFT,LiR,LoR

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/ml/classification/LinearSVC.scala |   6 +-
 .../ml/classification/LogisticRegression.scala | 114 +
 .../spark/ml/optim/aggregator/AFTAggregator.scala  |  42 +++
 .../ml/optim/aggregator/HingeAggregator.scala  |   6 +-
 .../ml/optim/aggregator/HuberAggregator.scala  |  11 +-
 .../optim/aggregator/LeastSquaresAggregator.scala  |  11 +-
 .../ml/optim/aggregator/LogisticAggregator.scala   |  19 +--
 .../ml/param/shared/SharedParamsCodeGen.scala  |   4 +-
 .../spark/ml/param/shared/sharedParams.scala   |   4 +-
 .../ml/regression/AFTSurvivalRegression.scala  | 130 +++
 .../spark/ml/regression/LinearRegression.scala | 138 ++---
 .../mllib/classification/LogisticRegression.scala  |   4 +-
 .../classification/LogisticRegressionSuite.scala   |   8 +-
 .../ml/regression/AFTSurvivalRegressionSuite.scala |   4 +-
 .../ml/regression/LinearRegressionSuite.scala  |   4 +-
 python/pyspark/ml/classification.py|  22 ++--
 python/pyspark/ml/classification.pyi   |   8 +-
 python/pyspark/ml/param/_shared_params_code_gen.py |   4 +-
 python/pyspark/ml/param/shared.py  |   4 +-
 python/pyspark/ml/regression.py|  46 +++
 python/pyspark/ml/regression.pyi   |  18 +--
 21 files changed, 237 insertions(+), 370 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (4335af0 -> a288716)

2020-11-12 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 4335af0  [MINOR][DOC] spark.executor.memoryOverhead is not 
cluster-mode only
 add a288716  [SPARK-32907][ML][PYTHON] adaptively blockify instances - 
LinearSVC

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/ml/classification/LinearSVC.scala | 93 +++---
 .../org/apache/spark/ml/feature/Instance.scala | 71 +
 .../ml/param/shared/SharedParamsCodeGen.scala  |  7 +-
 .../spark/ml/param/shared/sharedParams.scala   | 18 +
 .../spark/ml/classification/LinearSVCSuite.scala   |  4 +-
 .../apache/spark/ml/feature/InstanceSuite.scala| 54 +
 python/pyspark/ml/classification.py| 26 +++---
 python/pyspark/ml/classification.pyi   |  9 ++-
 python/pyspark/ml/param/_shared_params_code_gen.py |  6 +-
 python/pyspark/ml/param/shared.py  | 18 +
 python/pyspark/ml/param/shared.pyi |  5 ++
 11 files changed, 224 insertions(+), 87 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (4335af0 -> a288716)

2020-11-12 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 4335af0  [MINOR][DOC] spark.executor.memoryOverhead is not 
cluster-mode only
 add a288716  [SPARK-32907][ML][PYTHON] adaptively blockify instances - 
LinearSVC

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/ml/classification/LinearSVC.scala | 93 +++---
 .../org/apache/spark/ml/feature/Instance.scala | 71 +
 .../ml/param/shared/SharedParamsCodeGen.scala  |  7 +-
 .../spark/ml/param/shared/sharedParams.scala   | 18 +
 .../spark/ml/classification/LinearSVCSuite.scala   |  4 +-
 .../apache/spark/ml/feature/InstanceSuite.scala| 54 +
 python/pyspark/ml/classification.py| 26 +++---
 python/pyspark/ml/classification.pyi   |  9 ++-
 python/pyspark/ml/param/_shared_params_code_gen.py |  6 +-
 python/pyspark/ml/param/shared.py  | 18 +
 python/pyspark/ml/param/shared.pyi |  5 ++
 11 files changed, 224 insertions(+), 87 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (4335af0 -> a288716)

2020-11-12 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 4335af0  [MINOR][DOC] spark.executor.memoryOverhead is not 
cluster-mode only
 add a288716  [SPARK-32907][ML][PYTHON] adaptively blockify instances - 
LinearSVC

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/ml/classification/LinearSVC.scala | 93 +++---
 .../org/apache/spark/ml/feature/Instance.scala | 71 +
 .../ml/param/shared/SharedParamsCodeGen.scala  |  7 +-
 .../spark/ml/param/shared/sharedParams.scala   | 18 +
 .../spark/ml/classification/LinearSVCSuite.scala   |  4 +-
 .../apache/spark/ml/feature/InstanceSuite.scala| 54 +
 python/pyspark/ml/classification.py| 26 +++---
 python/pyspark/ml/classification.pyi   |  9 ++-
 python/pyspark/ml/param/_shared_params_code_gen.py |  6 +-
 python/pyspark/ml/param/shared.py  | 18 +
 python/pyspark/ml/param/shared.pyi |  5 ++
 11 files changed, 224 insertions(+), 87 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (4335af0 -> a288716)

2020-11-12 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 4335af0  [MINOR][DOC] spark.executor.memoryOverhead is not 
cluster-mode only
 add a288716  [SPARK-32907][ML][PYTHON] adaptively blockify instances - 
LinearSVC

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/ml/classification/LinearSVC.scala | 93 +++---
 .../org/apache/spark/ml/feature/Instance.scala | 71 +
 .../ml/param/shared/SharedParamsCodeGen.scala  |  7 +-
 .../spark/ml/param/shared/sharedParams.scala   | 18 +
 .../spark/ml/classification/LinearSVCSuite.scala   |  4 +-
 .../apache/spark/ml/feature/InstanceSuite.scala| 54 +
 python/pyspark/ml/classification.py| 26 +++---
 python/pyspark/ml/classification.pyi   |  9 ++-
 python/pyspark/ml/param/_shared_params_code_gen.py |  6 +-
 python/pyspark/ml/param/shared.py  | 18 +
 python/pyspark/ml/param/shared.pyi |  5 ++
 11 files changed, 224 insertions(+), 87 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (4335af0 -> a288716)

2020-11-12 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 4335af0  [MINOR][DOC] spark.executor.memoryOverhead is not 
cluster-mode only
 add a288716  [SPARK-32907][ML][PYTHON] adaptively blockify instances - 
LinearSVC

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/ml/classification/LinearSVC.scala | 93 +++---
 .../org/apache/spark/ml/feature/Instance.scala | 71 +
 .../ml/param/shared/SharedParamsCodeGen.scala  |  7 +-
 .../spark/ml/param/shared/sharedParams.scala   | 18 +
 .../spark/ml/classification/LinearSVCSuite.scala   |  4 +-
 .../apache/spark/ml/feature/InstanceSuite.scala| 54 +
 python/pyspark/ml/classification.py| 26 +++---
 python/pyspark/ml/classification.pyi   |  9 ++-
 python/pyspark/ml/param/_shared_params_code_gen.py |  6 +-
 python/pyspark/ml/param/shared.py  | 18 +
 python/pyspark/ml/param/shared.pyi |  5 ++
 11 files changed, 224 insertions(+), 87 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] 01/02: init

2020-04-20 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch fix_pipeline_tuning
in repository https://gitbox.apache.org/repos/asf/spark.git

commit c834fe8f335dc74db6346d82b5ce4cf742cba9bb
Author: Weichen Xu 
AuthorDate: Mon Apr 20 17:04:12 2020 +0800

init
---
 python/pyspark/ml/pipeline.py | 46 +--
 1 file changed, 44 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py
index 09e0748..0004b64 100644
--- a/python/pyspark/ml/pipeline.py
+++ b/python/pyspark/ml/pipeline.py
@@ -25,8 +25,8 @@ from pyspark import since, keyword_only, SparkContext
 from pyspark.ml.base import Estimator, Model, Transformer
 from pyspark.ml.param import Param, Params
 from pyspark.ml.util import *
-from pyspark.ml.wrapper import JavaParams
-from pyspark.ml.common import inherit_doc
+from pyspark.ml.wrapper import JavaParams, JavaWrapper
+from pyspark.ml.common import inherit_doc, _java2py, _py2java
 
 
 @inherit_doc
@@ -174,6 +174,48 @@ class Pipeline(Estimator, MLReadable, MLWritable):
 
 return _java_obj
 
+def _make_java_param_pair(self, param, value):
+"""
+Makes a Java param pair.
+"""
+sc = SparkContext._active_spark_context
+param = self._resolveParam(param)
+java_param = sc._jvm.org.apache.spark.ml.param.Param(param.parent, 
param.name, param.doc)
+if isinstance(value, JavaParams):
+# used in the case of an estimator having another estimator as a 
parameter
+# the reason why this is not in _py2java in common.py is that 
importing
+# Estimator and Model in common.py results in a circular import 
with inherit_doc
+java_value = value._to_java()
+else:
+java_value = _py2java(sc, value)
+return java_param.w(java_value)
+
+def _transfer_param_map_to_java(self, pyParamMap):
+"""
+Transforms a Python ParamMap into a Java ParamMap.
+"""
+paramMap = 
JavaWrapper._new_java_obj("org.apache.spark.ml.param.ParamMap")
+for param in self.params:
+if param in pyParamMap:
+pair = self._make_java_param_pair(param, pyParamMap[param])
+paramMap.put([pair])
+return paramMap
+
+def _transfer_param_map_from_java(self, javaParamMap):
+"""
+Transforms a Java ParamMap into a Python ParamMap.
+"""
+sc = SparkContext._active_spark_context
+paramMap = dict()
+for pair in javaParamMap.toList():
+param = pair.param()
+if self.hasParam(str(param.name())):
+if param.name() == "classifier":
+paramMap[self.getParam(param.name())] = 
JavaParams._from_java(pair.value())
+else:
+paramMap[self.getParam(param.name())] = _java2py(sc, 
pair.value())
+return paramMap
+
 
 @inherit_doc
 class PipelineWriter(MLWriter):


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch fix_pipeline_tuning created (now 2af493a)

2020-04-20 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch fix_pipeline_tuning
in repository https://gitbox.apache.org/repos/asf/spark.git.


  at 2af493a  init

This branch includes the following new commits:

 new c834fe8  init
 new 2af493a  init

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] 02/02: init

2020-04-20 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch fix_pipeline_tuning
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 2af493a016093d8499bf02bc37bc3e842ed74c62
Author: Weichen Xu 
AuthorDate: Mon Apr 20 17:47:35 2020 +0800

init
---
 python/pyspark/ml/pipeline.py | 8 +---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py
index 0004b64..3a71632 100644
--- a/python/pyspark/ml/pipeline.py
+++ b/python/pyspark/ml/pipeline.py
@@ -210,10 +210,12 @@ class Pipeline(Estimator, MLReadable, MLWritable):
 for pair in javaParamMap.toList():
 param = pair.param()
 if self.hasParam(str(param.name())):
-if param.name() == "classifier":
-paramMap[self.getParam(param.name())] = 
JavaParams._from_java(pair.value())
+java_obj = pair.value()
+if 
sc._jvm.Class.forName("org.apache.spark.ml.PipelineStage").isInstance(java_obj):
+py_obj = JavaParams._from_java(java_obj)
 else:
-paramMap[self.getParam(param.name())] = _java2py(sc, 
pair.value())
+py_obj = _java2py(sc, java_obj)
+paramMap[self.getParam(param.name())] = py_obj
 return paramMap
 
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] 02/02: init

2020-04-20 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch fix_pipeline_tuning
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 2af493a016093d8499bf02bc37bc3e842ed74c62
Author: Weichen Xu 
AuthorDate: Mon Apr 20 17:47:35 2020 +0800

init
---
 python/pyspark/ml/pipeline.py | 8 +---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py
index 0004b64..3a71632 100644
--- a/python/pyspark/ml/pipeline.py
+++ b/python/pyspark/ml/pipeline.py
@@ -210,10 +210,12 @@ class Pipeline(Estimator, MLReadable, MLWritable):
 for pair in javaParamMap.toList():
 param = pair.param()
 if self.hasParam(str(param.name())):
-if param.name() == "classifier":
-paramMap[self.getParam(param.name())] = 
JavaParams._from_java(pair.value())
+java_obj = pair.value()
+if 
sc._jvm.Class.forName("org.apache.spark.ml.PipelineStage").isInstance(java_obj):
+py_obj = JavaParams._from_java(java_obj)
 else:
-paramMap[self.getParam(param.name())] = _java2py(sc, 
pair.value())
+py_obj = _java2py(sc, java_obj)
+paramMap[self.getParam(param.name())] = py_obj
 return paramMap
 
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] 01/02: init

2020-04-20 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch fix_pipeline_tuning
in repository https://gitbox.apache.org/repos/asf/spark.git

commit c834fe8f335dc74db6346d82b5ce4cf742cba9bb
Author: Weichen Xu 
AuthorDate: Mon Apr 20 17:04:12 2020 +0800

init
---
 python/pyspark/ml/pipeline.py | 46 +--
 1 file changed, 44 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py
index 09e0748..0004b64 100644
--- a/python/pyspark/ml/pipeline.py
+++ b/python/pyspark/ml/pipeline.py
@@ -25,8 +25,8 @@ from pyspark import since, keyword_only, SparkContext
 from pyspark.ml.base import Estimator, Model, Transformer
 from pyspark.ml.param import Param, Params
 from pyspark.ml.util import *
-from pyspark.ml.wrapper import JavaParams
-from pyspark.ml.common import inherit_doc
+from pyspark.ml.wrapper import JavaParams, JavaWrapper
+from pyspark.ml.common import inherit_doc, _java2py, _py2java
 
 
 @inherit_doc
@@ -174,6 +174,48 @@ class Pipeline(Estimator, MLReadable, MLWritable):
 
 return _java_obj
 
+def _make_java_param_pair(self, param, value):
+"""
+Makes a Java param pair.
+"""
+sc = SparkContext._active_spark_context
+param = self._resolveParam(param)
+java_param = sc._jvm.org.apache.spark.ml.param.Param(param.parent, 
param.name, param.doc)
+if isinstance(value, JavaParams):
+# used in the case of an estimator having another estimator as a 
parameter
+# the reason why this is not in _py2java in common.py is that 
importing
+# Estimator and Model in common.py results in a circular import 
with inherit_doc
+java_value = value._to_java()
+else:
+java_value = _py2java(sc, value)
+return java_param.w(java_value)
+
+def _transfer_param_map_to_java(self, pyParamMap):
+"""
+Transforms a Python ParamMap into a Java ParamMap.
+"""
+paramMap = 
JavaWrapper._new_java_obj("org.apache.spark.ml.param.ParamMap")
+for param in self.params:
+if param in pyParamMap:
+pair = self._make_java_param_pair(param, pyParamMap[param])
+paramMap.put([pair])
+return paramMap
+
+def _transfer_param_map_from_java(self, javaParamMap):
+"""
+Transforms a Java ParamMap into a Python ParamMap.
+"""
+sc = SparkContext._active_spark_context
+paramMap = dict()
+for pair in javaParamMap.toList():
+param = pair.param()
+if self.hasParam(str(param.name())):
+if param.name() == "classifier":
+paramMap[self.getParam(param.name())] = 
JavaParams._from_java(pair.value())
+else:
+paramMap[self.getParam(param.name())] = _java2py(sc, 
pair.value())
+return paramMap
+
 
 @inherit_doc
 class PipelineWriter(MLWriter):


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch fix_pipeline_tuning created (now 2af493a)

2020-04-20 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch fix_pipeline_tuning
in repository https://gitbox.apache.org/repos/asf/spark.git.


  at 2af493a  init

This branch includes the following new commits:

 new c834fe8  init
 new 2af493a  init

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: Revert "[SPARK-30791][SQL][PYTHON] Add 'sameSemantics' and 'sementicHash' methods in Dataset"

2020-02-17 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new cb890d9  Revert "[SPARK-30791][SQL][PYTHON] Add 'sameSemantics' and 
'sementicHash' methods in Dataset"
cb890d9 is described below

commit cb890d96bc38860988dba97efaf6d88cc8c09288
Author: WeichenXu 
AuthorDate: Tue Feb 18 10:41:49 2020 +0800

Revert "[SPARK-30791][SQL][PYTHON] Add 'sameSemantics' and 'sementicHash' 
methods in Dataset"

This reverts commit ba9141592d0f0ce23c207efb21ae84ac7cc4670a.
---
 python/pyspark/sql/dataframe.py| 46 --
 python/pyspark/sql/tests/test_dataframe.py |  5 ---
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 28 -
 .../scala/org/apache/spark/sql/DatasetSuite.scala  | 15 ---
 4 files changed, 94 deletions(-)

diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 8325b68..2432b81 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -2153,52 +2153,6 @@ class DataFrame(PandasMapOpsMixin, 
PandasConversionMixin):
   "should have been DataFrame." % 
type(result)
 return result
 
-@since(3.1)
-def sameSemantics(self, other):
-"""
-Returns `True` when the logical query plans inside both 
:class:`DataFrame`\\s are equal and
-therefore return same results.
-
-.. note:: The equality comparison here is simplified by tolerating the 
cosmetic differences
-such as attribute names.
-
-.. note:: This API can compare both :class:`DataFrame`\\s very fast 
but can still return
-`False` on the :class:`DataFrame` that return the same results, 
for instance, from
-different plans. Such false negative semantic can be useful when 
caching as an example.
-
-.. note:: DeveloperApi
-
->>> df1 = spark.range(10)
->>> df2 = spark.range(10)
->>> df1.withColumn("col1", df1.id * 
2).sameSemantics(df2.withColumn("col1", df2.id * 2))
-True
->>> df1.withColumn("col1", df1.id * 
2).sameSemantics(df2.withColumn("col1", df2.id + 2))
-False
->>> df1.withColumn("col1", df1.id * 
2).sameSemantics(df2.withColumn("col0", df2.id * 2))
-True
-"""
-if not isinstance(other, DataFrame):
-raise ValueError("other parameter should be of DataFrame; however, 
got %s"
- % type(other))
-return self._jdf.sameSemantics(other._jdf)
-
-@since(3.1)
-def semanticHash(self):
-"""
-Returns a hash code of the logical query plan against this 
:class:`DataFrame`.
-
-.. note:: Unlike the standard hash code, the hash is calculated 
against the query plan
-simplified by tolerating the cosmetic differences such as 
attribute names.
-
-.. note:: DeveloperApi
-
->>> spark.range(10).selectExpr("id as col0").semanticHash()  # 
doctest: +SKIP
-1855039936
->>> spark.range(10).selectExpr("id as col1").semanticHash()  # 
doctest: +SKIP
-1855039936
-"""
-return self._jdf.semanticHash()
-
 where = copy_func(
 filter,
 sinceversion=1.3,
diff --git a/python/pyspark/sql/tests/test_dataframe.py 
b/python/pyspark/sql/tests/test_dataframe.py
index 942cd4b..d738449 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -782,11 +782,6 @@ class DataFrameTests(ReusedSQLTestCase):
 break
 self.assertEqual(df.take(8), result)
 
-def test_same_semantics_error(self):
-with QuietTest(self.sc):
-with self.assertRaisesRegexp(ValueError, "should be of 
DataFrame.*int"):
-self.spark.range(10).sameSemantics(1)
-
 
 class QueryExecutionListenerTests(unittest.TestCase, SQLTestUtils):
 # These tests are separate because it uses 
'spark.sql.queryExecutionListeners' which is
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 5cd2583..42f3535 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -3310,34 +3310,6 @@ class Dataset[T] private[sql](
 files.toSet.toArray
   }
 
-  /**
-   * Returns `true` when the logical query plans inside both [[Dataset]]s are 
equal and
-   * therefore return same results.
-   *
-   * @note The equality comp

[spark] branch master updated (4ed9b88 -> d8c0599)

2020-02-17 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 4ed9b88  [SPARK-30832][DOCS] SQL function doc headers should link to 
anchors
 add d8c0599  [SPARK-30791][SQL][PYTHON] Add 'sameSemantics' and 
'sementicHash' methods in Dataset

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/dataframe.py| 46 ++
 python/pyspark/sql/tests/test_dataframe.py |  5 +++
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 28 +
 .../scala/org/apache/spark/sql/DatasetSuite.scala  | 15 +++
 4 files changed, 94 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-30791][SQL][PYTHON] Add 'sameSemantics' and 'sementicHash' methods in Dataset

2020-02-17 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new ba91415  [SPARK-30791][SQL][PYTHON] Add 'sameSemantics' and 
'sementicHash' methods in Dataset
ba91415 is described below

commit ba9141592d0f0ce23c207efb21ae84ac7cc4670a
Author: Liang Zhang 
AuthorDate: Tue Feb 18 09:22:26 2020 +0800

[SPARK-30791][SQL][PYTHON] Add 'sameSemantics' and 'sementicHash' methods 
in Dataset

### What changes were proposed in this pull request?
This PR added two DeveloperApis to the Dataset[T] class. Both methods are 
just exposing lower-level methods to the Dataset[T] class.

### Why are the changes needed?
They are useful for checking whether two dataframes are the same when 
implementing dataframe caching in python, and also get a unique ID. It's easier 
to use if we wrap the lower-level APIs.

### Does this PR introduce any user-facing change?
```
scala> val df1 = Seq((1,2),(4,5)).toDF("col1", "col2")
df1: org.apache.spark.sql.DataFrame = [col1: int, col2: int]

scala> val df2 = Seq((1,2),(4,5)).toDF("col1", "col2")
df2: org.apache.spark.sql.DataFrame = [col1: int, col2: int]

scala> val df3 = Seq((0,2),(4,5)).toDF("col1", "col2")
df3: org.apache.spark.sql.DataFrame = [col1: int, col2: int]

scala> val df4 = Seq((0,2),(4,5)).toDF("col0", "col2")
df4: org.apache.spark.sql.DataFrame = [col0: int, col2: int]

scala> df1.semanticHash
res0: Int = 594427822

scala> df2.semanticHash
res1: Int = 594427822

scala> df1.sameSemantics(df2)
res2: Boolean = true

scala> df1.sameSemantics(df3)
res3: Boolean = false

scala> df3.semanticHash
res4: Int = -1592702048

scala> df4.semanticHash
res5: Int = -1592702048

scala> df4.sameSemantics(df3)
res6: Boolean = true
```

### How was this patch tested?
Unit test in scala and doctest in python.

Note: comments are copied from the corresponding lower-level APIs.
Note: There are some issues to be fixed that would improve the hash 
collision rate: https://github.com/apache/spark/pull/27565#discussion_r379881028

Closes #27565 from liangz1/df-same-result.

Authored-by: Liang Zhang 
Signed-off-by: WeichenXu 
(cherry picked from commit d8c0599e542976ef70b60bc673e7c9732fce49e5)
Signed-off-by: WeichenXu 
---
 python/pyspark/sql/dataframe.py| 46 ++
 python/pyspark/sql/tests/test_dataframe.py |  5 +++
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 28 +
 .../scala/org/apache/spark/sql/DatasetSuite.scala  | 15 +++
 4 files changed, 94 insertions(+)

diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 2432b81..8325b68 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -2153,6 +2153,52 @@ class DataFrame(PandasMapOpsMixin, 
PandasConversionMixin):
   "should have been DataFrame." % 
type(result)
 return result
 
+@since(3.1)
+def sameSemantics(self, other):
+"""
+Returns `True` when the logical query plans inside both 
:class:`DataFrame`\\s are equal and
+therefore return same results.
+
+.. note:: The equality comparison here is simplified by tolerating the 
cosmetic differences
+such as attribute names.
+
+.. note:: This API can compare both :class:`DataFrame`\\s very fast 
but can still return
+`False` on the :class:`DataFrame` that return the same results, 
for instance, from
+different plans. Such false negative semantic can be useful when 
caching as an example.
+
+.. note:: DeveloperApi
+
+>>> df1 = spark.range(10)
+>>> df2 = spark.range(10)
+>>> df1.withColumn("col1", df1.id * 
2).sameSemantics(df2.withColumn("col1", df2.id * 2))
+True
+>>> df1.withColumn("col1", df1.id * 
2).sameSemantics(df2.withColumn("col1", df2.id + 2))
+False
+>>> df1.withColumn("col1", df1.id * 
2).sameSemantics(df2.withColumn("col0", df2.id * 2))
+True
+"""
+if not isinstance(other, DataFrame):
+raise ValueError("other parameter should be of DataFrame; however, 
got %s"
+ % type(other))
+return self._jdf.sameSemantics(other._jdf)
+
+@since(3.1)
+def semanticHash(self):
+"""
+Ret

[spark] branch branch-3.0 updated: [SPARK-30762] Add dtype=float32 support to vector_to_array UDF

2020-02-13 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 074712e  [SPARK-30762] Add dtype=float32 support to vector_to_array UDF
074712e is described below

commit 074712e329b347f769f8c009949c7845e95b3212
Author: Liang Zhang 
AuthorDate: Thu Feb 13 23:55:13 2020 +0800

[SPARK-30762] Add dtype=float32 support to vector_to_array UDF

### What changes were proposed in this pull request?
In this PR, we add a parameter in the python function vector_to_array(col) 
that allows converting to a column of arrays of Float (32bits) in scala, which 
would be mapped to a numpy array of dtype=float32.

### Why are the changes needed?
In the downstream ML training, using float32 instead of float64 (default) 
would allow a larger batch size, i.e., allow more data to fit in the memory.

### Does this PR introduce any user-facing change?
Yes.
Old: `vector_to_array()` only take one param
```
df.select(vector_to_array("colA"), ...)
```
New: `vector_to_array()` can take an additional optional param: `dtype` = 
"float32" (or "float64")
```
df.select(vector_to_array("colA", "float32"), ...)
```

### How was this patch tested?
Unit test in scala.
doctest in python.

Closes #27522 from liangz1/udf-float32.

Authored-by: Liang Zhang 
Signed-off-by: WeichenXu 
(cherry picked from commit 82d0aa37ae521231d8067e473c6ea79a118a115a)
Signed-off-by: WeichenXu 
---
 .../main/scala/org/apache/spark/ml/functions.scala | 34 +++---
 .../scala/org/apache/spark/ml/FunctionsSuite.scala | 33 ++---
 python/pyspark/ml/functions.py | 27 +
 3 files changed, 81 insertions(+), 13 deletions(-)

diff --git a/mllib/src/main/scala/org/apache/spark/ml/functions.scala 
b/mllib/src/main/scala/org/apache/spark/ml/functions.scala
index 1faf562..0f03231 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/functions.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/functions.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.ml
 
 import org.apache.spark.annotation.Since
-import org.apache.spark.ml.linalg.Vector
+import org.apache.spark.ml.linalg.{SparseVector, Vector}
 import org.apache.spark.mllib.linalg.{Vector => OldVector}
 import org.apache.spark.sql.Column
 import org.apache.spark.sql.functions.udf
@@ -27,7 +27,6 @@ import org.apache.spark.sql.functions.udf
 @Since("3.0.0")
 object functions {
 // scalastyle:on
-
   private val vectorToArrayUdf = udf { vec: Any =>
 vec match {
   case v: Vector => v.toArray
@@ -39,10 +38,37 @@ object functions {
 }
   }.asNonNullable()
 
+  private val vectorToArrayFloatUdf = udf { vec: Any =>
+vec match {
+  case v: SparseVector =>
+val data = new Array[Float](v.size)
+v.foreachActive { (index, value) => data(index) = value.toFloat }
+data
+  case v: Vector => v.toArray.map(_.toFloat)
+  case v: OldVector => v.toArray.map(_.toFloat)
+  case v => throw new IllegalArgumentException(
+"function vector_to_array requires a non-null input argument and input 
type must be " +
+"`org.apache.spark.ml.linalg.Vector` or 
`org.apache.spark.mllib.linalg.Vector`, " +
+s"but got ${ if (v == null) "null" else v.getClass.getName }.")
+}
+  }.asNonNullable()
+
   /**
* Converts a column of MLlib sparse/dense vectors into a column of dense 
arrays.
-   *
+   * @param v: the column of MLlib sparse/dense vectors
+   * @param dtype: the desired underlying data type in the returned array
+   * @return an arrayfloat if dtype is float32, or arraydouble 
if dtype is float64
* @since 3.0.0
*/
-  def vector_to_array(v: Column): Column = vectorToArrayUdf(v)
+  def vector_to_array(v: Column, dtype: String = "float64"): Column = {
+if (dtype == "float64") {
+  vectorToArrayUdf(v)
+} else if (dtype == "float32") {
+  vectorToArrayFloatUdf(v)
+} else {
+  throw new IllegalArgumentException(
+s"Unsupported dtype: $dtype. Valid values: float64, float32."
+  )
+}
+  }
 }
diff --git a/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala
index 2f5062c..3dd9a7d 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala
@@ -34,9 +34,8 @@ class FunctionsSuite extends MLTest {
   (Vectors.sparse(3, Seq((0, 2.0), (2, 3.0))), OldVectors.sparse(3, 
Seq((0, 20.0), (2, 30.0
 ).toDF("vec", "oldVec")

[spark] branch master updated (fb0e07b -> 82d0aa3)

2020-02-13 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from fb0e07b  [SPARK-29231][SQL] Constraints should be inferred from cast 
equality constraint
 add 82d0aa3  [SPARK-30762] Add dtype=float32 support to vector_to_array UDF

No new revisions were added by this update.

Summary of changes:
 .../main/scala/org/apache/spark/ml/functions.scala | 34 +++---
 .../scala/org/apache/spark/ml/FunctionsSuite.scala | 33 ++---
 python/pyspark/ml/functions.py | 27 +
 3 files changed, 81 insertions(+), 13 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (fb0e07b -> 82d0aa3)

2020-02-13 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from fb0e07b  [SPARK-29231][SQL] Constraints should be inferred from cast 
equality constraint
 add 82d0aa3  [SPARK-30762] Add dtype=float32 support to vector_to_array UDF

No new revisions were added by this update.

Summary of changes:
 .../main/scala/org/apache/spark/ml/functions.scala | 34 +++---
 .../scala/org/apache/spark/ml/FunctionsSuite.scala | 33 ++---
 python/pyspark/ml/functions.py | 27 +
 3 files changed, 81 insertions(+), 13 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (79b10a1 -> 104b9b6)

2019-09-17 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 79b10a1  [SPARK-28929][CORE] Spark Logging level should be INFO 
instead of DEBUG in Executor Plugin API
 add 104b9b6  [SPARK-28483][FOLLOW-UP] Fix flaky test in 
BarrierTaskContextSuite

No new revisions were added by this update.

Summary of changes:
 .../spark/scheduler/BarrierTaskContextSuite.scala  | 72 +-
 1 file changed, 30 insertions(+), 42 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark-website] branch asf-site updated: Add Weichen Xu to committer list

2019-09-10 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/spark-website.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 7ade5ef  Add Weichen Xu to committer list
7ade5ef is described below

commit 7ade5efec5fca08252837d5b2e2bb2bee0494947
Author: WeichenXu 
AuthorDate: Tue Sep 10 17:36:55 2019 +0800

Add Weichen Xu to committer list

Add Weichen Xu to committer list.

Author: WeichenXu 

Closes #222 from WeichenXu123/add_weichen_committer.
---
 committers.md| 1 +
 site/committers.html | 4 
 2 files changed, 5 insertions(+)

diff --git a/committers.md b/committers.md
index d8ba384..2627f36 100644
--- a/committers.md
+++ b/committers.md
@@ -78,6 +78,7 @@ navigation:
 |Patrick Wendell|Databricks|
 |Andrew Xia|Alibaba|
 |Reynold Xin|Databricks|
+|Weichen Xu|Databricks|
 |Takeshi Yamamuro|NTT|
 |Burak Yavuz|Databricks|
 |Matei Zaharia|Databricks, Stanford|
diff --git a/site/committers.html b/site/committers.html
index 1c950ff..f5da5ab 100644
--- a/site/committers.html
+++ b/site/committers.html
@@ -483,6 +483,10 @@
   Databricks
 
 
+  Weichen Xu
+  Databricks
+
+
   Takeshi Yamamuro
   NTT
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org