Repository: spark
Updated Branches:
  refs/heads/master 9bb3a0c67 -> 8c2edf46d


[SPARK-24324][PYTHON][FOLLOW-UP] Rename the Conf to 
spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName

## What changes were proposed in this pull request?

Add the legacy prefix for 
spark.sql.execution.pandas.groupedMap.assignColumnsByPosition and rename it to 
spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName

## How was this patch tested?
The existing tests.

Closes #22540 from gatorsmile/renameAssignColumnsByPosition.

Lead-authored-by: gatorsmile <gatorsm...@gmail.com>
Co-authored-by: Hyukjin Kwon <gurwls...@gmail.com>
Signed-off-by: hyukjinkwon <gurwls...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8c2edf46
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8c2edf46
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8c2edf46

Branch: refs/heads/master
Commit: 8c2edf46d0f89e5ec54968218d89f30a3f8190bc
Parents: 9bb3a0c
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Wed Sep 26 09:32:51 2018 +0800
Committer: hyukjinkwon <gurwls...@apache.org>
Committed: Wed Sep 26 09:32:51 2018 +0800

----------------------------------------------------------------------
 python/pyspark/sql/tests.py                       |  3 ++-
 python/pyspark/worker.py                          |  7 ++++---
 .../org/apache/spark/sql/internal/SQLConf.scala   | 18 +++++++++---------
 .../spark/sql/execution/arrow/ArrowUtils.scala    |  9 +++------
 4 files changed, 18 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8c2edf46/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index b829bae..74642d4 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -5802,7 +5802,8 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
         import pandas as pd
         from pyspark.sql.functions import pandas_udf, PandasUDFType
 
-        with 
self.sql_conf({"spark.sql.execution.pandas.groupedMap.assignColumnsByPosition": 
True}):
+        with self.sql_conf({
+                
"spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName": False}):
 
             @pandas_udf("a string, b float", PandasUDFType.GROUPED_MAP)
             def foo(_):

http://git-wip-us.apache.org/repos/asf/spark/blob/8c2edf46/python/pyspark/worker.py
----------------------------------------------------------------------
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 974344f..8c59f1f 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -97,8 +97,9 @@ def wrap_scalar_pandas_udf(f, return_type):
 
 
 def wrap_grouped_map_pandas_udf(f, return_type, argspec, runner_conf):
-    assign_cols_by_pos = runner_conf.get(
-        "spark.sql.execution.pandas.groupedMap.assignColumnsByPosition", False)
+    assign_cols_by_name = runner_conf.get(
+        "spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName", 
"true")
+    assign_cols_by_name = assign_cols_by_name.lower() == "true"
 
     def wrapped(key_series, value_series):
         import pandas as pd
@@ -119,7 +120,7 @@ def wrap_grouped_map_pandas_udf(f, return_type, argspec, 
runner_conf):
                 "Expected: {} Actual: {}".format(len(return_type), 
len(result.columns)))
 
         # Assign result columns by schema name if user labeled with strings, 
else use position
-        if not assign_cols_by_pos and any(isinstance(name, basestring) for 
name in result.columns):
+        if assign_cols_by_name and any(isinstance(name, basestring) for name 
in result.columns):
             return [(result[field.name], to_arrow_type(field.dataType)) for 
field in return_type]
         else:
             return [(result[result.columns[i]], to_arrow_type(field.dataType))

http://git-wip-us.apache.org/repos/asf/spark/blob/8c2edf46/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 0e0a01d..e7c9a83 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1295,15 +1295,15 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
-  val PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_POSITION =
-    buildConf("spark.sql.execution.pandas.groupedMap.assignColumnsByPosition")
+  val PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_NAME =
+    
buildConf("spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName")
       .internal()
-      .doc("When true, a grouped map Pandas UDF will assign columns from the 
returned " +
-        "Pandas DataFrame based on position, regardless of column label type. 
When false, " +
-        "columns will be looked up by name if labeled with a string and 
fallback to use " +
-        "position if not. This configuration will be deprecated in future 
releases.")
+      .doc("When true, columns will be looked up by name if labeled with a 
string and fallback " +
+        "to use position if not. When false, a grouped map Pandas UDF will 
assign columns from " +
+        "the returned Pandas DataFrame based on position, regardless of column 
label type. " +
+        "This configuration will be deprecated in future releases.")
       .booleanConf
-      .createWithDefault(false)
+      .createWithDefault(true)
 
   val REPLACE_EXCEPT_WITH_FILTER = 
buildConf("spark.sql.optimizer.replaceExceptWithFilter")
     .internal()
@@ -1915,8 +1915,8 @@ class SQLConf extends Serializable with Logging {
 
   def pandasRespectSessionTimeZone: Boolean = 
getConf(PANDAS_RESPECT_SESSION_LOCAL_TIMEZONE)
 
-  def pandasGroupedMapAssignColumnssByPosition: Boolean =
-    getConf(SQLConf.PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_POSITION)
+  def pandasGroupedMapAssignColumnsByName: Boolean =
+    getConf(SQLConf.PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_NAME)
 
   def replaceExceptWithFilter: Boolean = getConf(REPLACE_EXCEPT_WITH_FILTER)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8c2edf46/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala
index 533097a..b1e8fb3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala
@@ -131,11 +131,8 @@ object ArrowUtils {
     } else {
       Nil
     }
-    val pandasColsByPosition = if 
(conf.pandasGroupedMapAssignColumnssByPosition) {
-      Seq(SQLConf.PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_POSITION.key -> "true")
-    } else {
-      Nil
-    }
-    Map(timeZoneConf ++ pandasColsByPosition: _*)
+    val pandasColsByName = 
Seq(SQLConf.PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_NAME.key ->
+      conf.pandasGroupedMapAssignColumnsByName.toString)
+    Map(timeZoneConf ++ pandasColsByName: _*)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to