zhengruifeng commented on a change in pull request #31472:
URL: https://github.com/apache/spark/pull/31472#discussion_r570108063



##########
File path: 
mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala
##########
@@ -223,6 +223,13 @@ class OneVsRestSuite extends MLTest with 
DefaultReadWriteTest {
     assert(oldCols === newCols)
   }
 
+  test("SPARK-SPARK-34356: OneVsRestModel.transform should avoid potential 
column conflict") {

Review comment:
       this test will fail in master and (maybe) all version of OVR.
   but I think fix it in master maybe enough.

##########
File path: 
mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala
##########
@@ -223,6 +223,13 @@ class OneVsRestSuite extends MLTest with 
DefaultReadWriteTest {
     assert(oldCols === newCols)
   }
 
+  test("SPARK-SPARK-34356: OneVsRestModel.transform should avoid potential 
column conflict") {

Review comment:
       this test will fail in master and (maybe) all versions of OVR.
   but I think fix it in master maybe enough.

##########
File path: 
mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala
##########
@@ -223,6 +223,13 @@ class OneVsRestSuite extends MLTest with 
DefaultReadWriteTest {
     assert(oldCols === newCols)
   }
 
+  test("SPARK-SPARK-34356: OneVsRestModel.transform should avoid potential 
column conflict") {

Review comment:
       this test will fail in master and (maybe) all versions of OVR.
   but I think fix it in master maybe enough.
   
   
![image](https://user-images.githubusercontent.com/7322292/106879331-f78b2e80-6715-11eb-8ffe-1af4c2a08120.png)
   

##########
File path: 
mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
##########
@@ -185,71 +185,56 @@ final class OneVsRestModel private[ml] (
       return dataset.toDF
     }
 
-    // determine the input columns: these need to be passed through
-    val origCols = dataset.schema.map(f => col(f.name))
-
     // add an accumulator column to store predictions of all the models
     val accColName = "mbc$acc" + UUID.randomUUID().toString
-    val initUDF = udf { () => Map[Int, Double]() }
-    val newDataset = dataset.withColumn(accColName, initUDF())
+    val newDataset = dataset.withColumn(accColName, 
lit(Array.emptyDoubleArray))
 
     // persist if underlying dataset is not persistent.
     val handlePersistence = !dataset.isStreaming && dataset.storageLevel == 
StorageLevel.NONE
-    if (handlePersistence) {
-      newDataset.persist(StorageLevel.MEMORY_AND_DISK)
-    }
+    if (handlePersistence) newDataset.persist(StorageLevel.MEMORY_AND_DISK)

Review comment:
       ok, I will ignore similar case in the future.

##########
File path: 
mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
##########
@@ -185,71 +185,56 @@ final class OneVsRestModel private[ml] (
       return dataset.toDF
     }
 
-    // determine the input columns: these need to be passed through
-    val origCols = dataset.schema.map(f => col(f.name))
-
     // add an accumulator column to store predictions of all the models
     val accColName = "mbc$acc" + UUID.randomUUID().toString
-    val initUDF = udf { () => Map[Int, Double]() }
-    val newDataset = dataset.withColumn(accColName, initUDF())
+    val newDataset = dataset.withColumn(accColName, 
lit(Array.emptyDoubleArray))
 
     // persist if underlying dataset is not persistent.
     val handlePersistence = !dataset.isStreaming && dataset.storageLevel == 
StorageLevel.NONE
-    if (handlePersistence) {
-      newDataset.persist(StorageLevel.MEMORY_AND_DISK)
-    }
+    if (handlePersistence) newDataset.persist(StorageLevel.MEMORY_AND_DISK)
 
     // update the accumulator column with the result of prediction of models
-    val aggregatedDataset = 
models.zipWithIndex.foldLeft[DataFrame](newDataset) {
-      case (df, (model, index)) =>
+    val aggregatedDataset = models.foldLeft[DataFrame](newDataset) {
+      case (df, model) =>
         // avoid calling directly setter of model
         val tmpModel = 
model.copy(ParamMap.empty).asInstanceOf[ClassificationModel[_, _]]
         tmpModel.setFeaturesCol($(featuresCol))
 
-        val rawPredictionCol = tmpModel.getRawPredictionCol
-        val columns = origCols ++ List(col(rawPredictionCol), col(accColName))
-
-        // add temporary column to store intermediate scores and update
-        val tmpColName = "mbc$tmp" + UUID.randomUUID().toString
-        val updateUDF = udf { (predictions: Map[Int, Double], prediction: 
Vector) =>
-          predictions + ((index, prediction(1)))
+        // use a temporary raw prediction column to avoid column conflict
+        val tmpRawPredName = "mbc$raw" + UUID.randomUUID().toString
+        tmpModel.setRawPredictionCol(tmpRawPredName)
+        tmpModel.setPredictionCol("")

Review comment:
       no, `tmpModel` is jsut a temporary model

##########
File path: 
mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
##########
@@ -185,71 +185,56 @@ final class OneVsRestModel private[ml] (
       return dataset.toDF
     }
 
-    // determine the input columns: these need to be passed through
-    val origCols = dataset.schema.map(f => col(f.name))
-
     // add an accumulator column to store predictions of all the models
     val accColName = "mbc$acc" + UUID.randomUUID().toString
-    val initUDF = udf { () => Map[Int, Double]() }
-    val newDataset = dataset.withColumn(accColName, initUDF())
+    val newDataset = dataset.withColumn(accColName, 
lit(Array.emptyDoubleArray))
 
     // persist if underlying dataset is not persistent.
     val handlePersistence = !dataset.isStreaming && dataset.storageLevel == 
StorageLevel.NONE
-    if (handlePersistence) {
-      newDataset.persist(StorageLevel.MEMORY_AND_DISK)
-    }
+    if (handlePersistence) newDataset.persist(StorageLevel.MEMORY_AND_DISK)
 
     // update the accumulator column with the result of prediction of models
-    val aggregatedDataset = 
models.zipWithIndex.foldLeft[DataFrame](newDataset) {
-      case (df, (model, index)) =>
+    val aggregatedDataset = models.foldLeft[DataFrame](newDataset) {
+      case (df, model) =>
         // avoid calling directly setter of model
         val tmpModel = 
model.copy(ParamMap.empty).asInstanceOf[ClassificationModel[_, _]]
         tmpModel.setFeaturesCol($(featuresCol))
 
-        val rawPredictionCol = tmpModel.getRawPredictionCol
-        val columns = origCols ++ List(col(rawPredictionCol), col(accColName))
-
-        // add temporary column to store intermediate scores and update
-        val tmpColName = "mbc$tmp" + UUID.randomUUID().toString
-        val updateUDF = udf { (predictions: Map[Int, Double], prediction: 
Vector) =>
-          predictions + ((index, prediction(1)))
+        // use a temporary raw prediction column to avoid column conflict
+        val tmpRawPredName = "mbc$raw" + UUID.randomUUID().toString
+        tmpModel.setRawPredictionCol(tmpRawPredName)
+        tmpModel.setPredictionCol("")
+        tmpModel match {
+          case m: ProbabilisticClassificationModel[_, _] => 
m.setProbabilityCol("")
+          case _ =>

Review comment:
       ok

##########
File path: 
mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
##########
@@ -185,71 +185,56 @@ final class OneVsRestModel private[ml] (
       return dataset.toDF
     }
 
-    // determine the input columns: these need to be passed through
-    val origCols = dataset.schema.map(f => col(f.name))
-
     // add an accumulator column to store predictions of all the models
     val accColName = "mbc$acc" + UUID.randomUUID().toString
-    val initUDF = udf { () => Map[Int, Double]() }
-    val newDataset = dataset.withColumn(accColName, initUDF())
+    val newDataset = dataset.withColumn(accColName, 
lit(Array.emptyDoubleArray))
 
     // persist if underlying dataset is not persistent.
     val handlePersistence = !dataset.isStreaming && dataset.storageLevel == 
StorageLevel.NONE
-    if (handlePersistence) {
-      newDataset.persist(StorageLevel.MEMORY_AND_DISK)
-    }
+    if (handlePersistence) newDataset.persist(StorageLevel.MEMORY_AND_DISK)
 
     // update the accumulator column with the result of prediction of models
-    val aggregatedDataset = 
models.zipWithIndex.foldLeft[DataFrame](newDataset) {
-      case (df, (model, index)) =>
+    val aggregatedDataset = models.foldLeft[DataFrame](newDataset) {
+      case (df, model) =>
         // avoid calling directly setter of model
         val tmpModel = 
model.copy(ParamMap.empty).asInstanceOf[ClassificationModel[_, _]]
         tmpModel.setFeaturesCol($(featuresCol))
 
-        val rawPredictionCol = tmpModel.getRawPredictionCol
-        val columns = origCols ++ List(col(rawPredictionCol), col(accColName))
-
-        // add temporary column to store intermediate scores and update
-        val tmpColName = "mbc$tmp" + UUID.randomUUID().toString
-        val updateUDF = udf { (predictions: Map[Int, Double], prediction: 
Vector) =>
-          predictions + ((index, prediction(1)))
+        // use a temporary raw prediction column to avoid column conflict
+        val tmpRawPredName = "mbc$raw" + UUID.randomUUID().toString
+        tmpModel.setRawPredictionCol(tmpRawPredName)
+        tmpModel.setPredictionCol("")

Review comment:
       `val tmpModel = 
model.copy(ParamMap.empty).asInstanceOf[ClassificationModel[_, _]]`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to