[spark] branch master updated (08678456d16 -> 7b8016a578f)

2022-09-22 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 08678456d16 [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS
 add 7b8016a578f [SPARK-38098][PYTHON] Add support for ArrayType of nested 
StructType to arrow-based conversion

No new revisions were added by this update.

Summary of changes:
 python/docs/source/user_guide/sql/arrow_pandas.rst |  5 +--
 python/pyspark/sql/pandas/types.py |  9 -
 python/pyspark/sql/pandas/utils.py | 13 
 python/pyspark/sql/tests/test_dataframe.py | 26 +++
 python/pyspark/sql/tests/test_pandas_udf_scalar.py | 38 +-
 .../org/apache/spark/sql/internal/SQLConf.scala|  4 +--
 6 files changed, 81 insertions(+), 14 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (f5c3f0c228f -> e50a37a587f)

2022-05-12 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from f5c3f0c228f [SPARK-39164][SQL] Wrap asserts/illegal state exceptions 
by the INTERNAL_ERROR exception in actions
 add e50a37a587f [SPARK-39160][SQL] Remove workaround for ARROW-1948

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/sql/vectorized/ArrowColumnVector.java  | 13 +
 1 file changed, 1 insertion(+), 12 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-34521][PYTHON][SQL] Fix spark.createDataFrame when using pandas with StringDtype

2021-12-15 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 816aba3  [SPARK-34521][PYTHON][SQL] Fix spark.createDataFrame when 
using pandas with StringDtype
816aba3 is described below

commit 816aba355cf8601688ba765b9c852b7feb64d3c2
Author: Nicolas Azrak 
AuthorDate: Wed Dec 15 22:03:19 2021 -0800

[SPARK-34521][PYTHON][SQL] Fix spark.createDataFrame when using pandas with 
StringDtype

### What changes were proposed in this pull request?

This change fixes `SPARK-34521`. It allows creating a spark DataFrame from 
a pandas DataFrame that is using a `StringDtype` column and arrow pyspark 
enabled.

### Why are the changes needed?

Pandas stores string columns in two different ways: using a numpy `ndarray` 
or using a custom `StringArray`. The `StringArray` version is used when 
specifing the `dtype=string`. When that happens, spark cannot serialize the 
column to arrow. Converting the `Series` before fixes this problem.

However, due to the different ways to handle string columns, doing 
`spark.createDataFrame(pandas_dataframe).toPandas()` might not equal to 
`pandas_dataframe`. The column dtype could be different.

More info: https://pandas.pydata.org/docs/user_guide/text.html

### Does this PR introduce _any_ user-facing change?

Trying to create a spark `DataFrame` from a pandas `DataFrame` using a 
string dtype and `"spark.sql.execution.arrow.pyspark.enabled"` now doesn't 
throw an exception and returns the expected dataframe.

Before:

`spark.createDataFrame(pd.DataFrame({"A": ['a', 'b', 'c']}, 
dtype="string"))`
```
Error
Traceback (most recent call last):
  File 
"/home/nico/projects/playground/spark/python/pyspark/sql/tests/test_arrow.py", 
line 415, in test_createDataFrame_with_string_dtype
print(self.spark.createDataFrame(pd.DataFrame({"A": ['a', 'b', 'c']}, 
dtype="string")))
  File 
"/home/nico/projects/playground/spark/python/pyspark/sql/session.py", line 823, 
in createDataFrame
return super(SparkSession, self).createDataFrame(  # type: 
ignore[call-overload]
  File 
"/home/nico/projects/playground/spark/python/pyspark/sql/pandas/conversion.py", 
line 358, in createDataFrame
return self._create_from_pandas_with_arrow(data, schema, timezone)
  File 
"/home/nico/projects/playground/spark/python/pyspark/sql/pandas/conversion.py", 
line 550, in _create_from_pandas_with_arrow
self._sc  # type: ignore[attr-defined]
  File "/home/nico/projects/playground/spark/python/pyspark/context.py", 
line 611, in _serialize_to_jvm
serializer.dump_stream(data, tempFile)
  File 
"/home/nico/projects/playground/spark/python/pyspark/sql/pandas/serializers.py",
 line 221, in dump_stream
super(ArrowStreamPandasSerializer, self).dump_stream(batches, stream)
  File 
"/home/nico/projects/playground/spark/python/pyspark/sql/pandas/serializers.py",
 line 81, in dump_stream
for batch in iterator:
  File 
"/home/nico/projects/playground/spark/python/pyspark/sql/pandas/serializers.py",
 line 220, in 
batches = (self._create_batch(series) for series in iterator)
  File 
"/home/nico/projects/playground/spark/python/pyspark/sql/pandas/serializers.py",
 line 211, in _create_batch
arrs.append(create_array(s, t))
  File 
"/home/nico/projects/playground/spark/python/pyspark/sql/pandas/serializers.py",
 line 185, in create_array
raise e
  File 
"/home/nico/projects/playground/spark/python/pyspark/sql/pandas/serializers.py",
 line 175, in create_array
array = pa.Array.from_pandas(s, mask=mask, type=t, safe=self._safecheck)
  File "pyarrow/array.pxi", line 904, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 252, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 107, in 
pyarrow.lib._handle_arrow_array_protocol
ValueError: Cannot specify a mask or a size when passing an object that is 
converted with the __arrow_array__ protocol.

```

After:

`spark.createDataFrame(pd.DataFrame({"A": ['a', 'b', 'c']}, 
dtype="string"))`
> `DataFrame[A: string]`

### How was this patch tested?

Using the `test_createDataFrame_with_string_dtype` test.

Closes #34509 from nicolasazrak/SPARK-34521.

Authored-by: Nicolas Azrak 
Signed-off-by: Bryan Cutler 
---
 python/pyspark/sql/pandas/serializers.py |  5 -
 python/pyspark/sql/tests/test_arrow.py   | 21 +
 2 files changed, 25 insertions(+), 1 deleti

[spark] branch master updated: [SPARK-32953][PYTHON][SQL] Add Arrow self_destruct support to toPandas

2021-02-10 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 9b875ce  [SPARK-32953][PYTHON][SQL] Add Arrow self_destruct support to 
toPandas
9b875ce is described below

commit 9b875ceada60732899053fbd90728b4944d1c03d
Author: David Li 
AuthorDate: Wed Feb 10 09:58:46 2021 -0800

[SPARK-32953][PYTHON][SQL] Add Arrow self_destruct support to toPandas

### What changes were proposed in this pull request?

Creating a Pandas dataframe via Apache Arrow currently can use twice as 
much memory as the final result, because during the conversion, both Pandas and 
Arrow retain a copy of the data. Arrow has a "self-destruct" mode now (Arrow >= 
0.16) to avoid this, by freeing each column after conversion. This PR 
integrates support for this in toPandas, handling a couple of edge cases:

self_destruct has no effect unless the memory is allocated appropriately, 
which is handled in the Arrow serializer here. Essentially, the issue is that 
self_destruct frees memory column-wise, but Arrow record batches are oriented 
row-wise:

```
Record batch 0: allocation 0: column 0 chunk 0, column 1 chunk 0, ...
Record batch 1: allocation 1: column 0 chunk 1, column 1 chunk 1, ...
```

In this scenario, Arrow will drop references to all of column 0's chunks, 
but no memory will actually be freed, as the chunks were just slices of an 
underlying allocation. The PR copies each column into its own allocation so 
that memory is instead arranged as so:

```
Record batch 0: allocation 0 column 0 chunk 0, allocation 1 column 1 chunk 
0, ...
Record batch 1: allocation 2 column 0 chunk 1, allocation 3 column 1 chunk 
1, ...
```

The optimization is disabled by default, and can be enabled with the Spark 
SQL conf "spark.sql.execution.arrow.pyspark.selfDestruct.enabled" set to 
"true". We can't always apply this optimization because it's more likely to 
generate a dataframe with immutable buffers, which Pandas doesn't always handle 
well, and because it is slower overall (since it only converts one column at a 
time instead of in parallel).

### Why are the changes needed?

This lets us load larger datasets - in particular, with N bytes of memory, 
before we could never load a dataset bigger than N/2 bytes; now the overhead is 
more like N/1.25 or so.

### Does this PR introduce _any_ user-facing change?

Yes - it adds a new SQL conf 
"spark.sql.execution.arrow.pyspark.selfDestruct.enabled"

### How was this patch tested?

See the [mailing 
list](http://apache-spark-developers-list.1001551.n3.nabble.com/DISCUSS-Reducing-memory-usage-of-toPandas-with-Arrow-quot-self-destruct-quot-option-td30149.html)
 - it was tested with Python memory_profiler. Unit tests added to check memory 
within certain bounds and correctness with the option enabled.

Closes #29818 from lidavidm/spark-32953.

Authored-by: David Li 
Signed-off-by: Bryan Cutler 
---
 python/pyspark/sql/pandas/conversion.py| 48 --
 python/pyspark/sql/tests/test_arrow.py | 33 ++-
 .../org/apache/spark/sql/internal/SQLConf.scala| 13 ++
 3 files changed, 89 insertions(+), 5 deletions(-)

diff --git a/python/pyspark/sql/pandas/conversion.py 
b/python/pyspark/sql/pandas/conversion.py
index d8a2414..92ef7ce 100644
--- a/python/pyspark/sql/pandas/conversion.py
+++ b/python/pyspark/sql/pandas/conversion.py
@@ -105,13 +105,29 @@ class PandasConversionMixin(object):
 import pyarrow
 # Rename columns to avoid duplicated column names.
 tmp_column_names = ['col_{}'.format(i) for i in 
range(len(self.columns))]
-batches = self.toDF(*tmp_column_names)._collect_as_arrow()
+self_destruct = 
self.sql_ctx._conf.arrowPySparkSelfDestructEnabled()
+batches = self.toDF(*tmp_column_names)._collect_as_arrow(
+split_batches=self_destruct)
 if len(batches) > 0:
 table = pyarrow.Table.from_batches(batches)
+# Ensure only the table has a reference to the 
batches, so that
+# self_destruct (if enabled) is effective
+del batches
 # Pandas DataFrame created from PyArrow uses 
datetime64[ns] for date type
 # values, but we should use datetime.date to match the 
behavior with when
 # Arrow optimization is disabled.
-pdf = table.to_pandas(date_as_object=True)
+pandas_options = {'date_as_object': T

[spark] branch master updated (7aed81d -> 66005a3)

2020-10-21 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 7aed81d  [SPARK-33202][CORE] Fix BlockManagerDecommissioner to return 
the correct migration status
 add 66005a3  [SPARK-31964][PYTHON][FOLLOW-UP] Use is_categorical_dtype 
instead of deprecated is_categorical

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/pandas/serializers.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (7aed81d -> 66005a3)

2020-10-21 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 7aed81d  [SPARK-33202][CORE] Fix BlockManagerDecommissioner to return 
the correct migration status
 add 66005a3  [SPARK-31964][PYTHON][FOLLOW-UP] Use is_categorical_dtype 
instead of deprecated is_categorical

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/pandas/serializers.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (7aed81d -> 66005a3)

2020-10-21 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 7aed81d  [SPARK-33202][CORE] Fix BlockManagerDecommissioner to return 
the correct migration status
 add 66005a3  [SPARK-31964][PYTHON][FOLLOW-UP] Use is_categorical_dtype 
instead of deprecated is_categorical

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/pandas/serializers.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (7aed81d -> 66005a3)

2020-10-21 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 7aed81d  [SPARK-33202][CORE] Fix BlockManagerDecommissioner to return 
the correct migration status
 add 66005a3  [SPARK-31964][PYTHON][FOLLOW-UP] Use is_categorical_dtype 
instead of deprecated is_categorical

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/pandas/serializers.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (7aed81d -> 66005a3)

2020-10-21 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 7aed81d  [SPARK-33202][CORE] Fix BlockManagerDecommissioner to return 
the correct migration status
 add 66005a3  [SPARK-31964][PYTHON][FOLLOW-UP] Use is_categorical_dtype 
instead of deprecated is_categorical

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/pandas/serializers.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (e3a88a9 -> 41cf1d0)

2020-08-24 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from e3a88a9  [SPARK-32516][SQL] 'path' option cannot coexist with load()'s 
path parameters
 add 41cf1d0  [SPARK-32686][PYTHON] Un-deprecate inferring DataFrame schema 
from list of dict

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/session.py | 15 ---
 1 file changed, 4 insertions(+), 11 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (e3a88a9 -> 41cf1d0)

2020-08-24 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from e3a88a9  [SPARK-32516][SQL] 'path' option cannot coexist with load()'s 
path parameters
 add 41cf1d0  [SPARK-32686][PYTHON] Un-deprecate inferring DataFrame schema 
from list of dict

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/session.py | 15 ---
 1 file changed, 4 insertions(+), 11 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (e3a88a9 -> 41cf1d0)

2020-08-24 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from e3a88a9  [SPARK-32516][SQL] 'path' option cannot coexist with load()'s 
path parameters
 add 41cf1d0  [SPARK-32686][PYTHON] Un-deprecate inferring DataFrame schema 
from list of dict

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/session.py | 15 ---
 1 file changed, 4 insertions(+), 11 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (e3a88a9 -> 41cf1d0)

2020-08-24 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from e3a88a9  [SPARK-32516][SQL] 'path' option cannot coexist with load()'s 
path parameters
 add 41cf1d0  [SPARK-32686][PYTHON] Un-deprecate inferring DataFrame schema 
from list of dict

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/session.py | 15 ---
 1 file changed, 4 insertions(+), 11 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (e3a88a9 -> 41cf1d0)

2020-08-24 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from e3a88a9  [SPARK-32516][SQL] 'path' option cannot coexist with load()'s 
path parameters
 add 41cf1d0  [SPARK-32686][PYTHON] Un-deprecate inferring DataFrame schema 
from list of dict

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/session.py | 15 ---
 1 file changed, 4 insertions(+), 11 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-32300][PYTHON][2.4] toPandas should work from a Spark DataFrame with no partitions

2020-07-14 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 5084c71  [SPARK-32300][PYTHON][2.4] toPandas should work from a Spark 
DataFrame with no partitions
5084c71 is described below

commit 5084c7100b20c49c423ababdd4fcd59eaebd9909
Author: HyukjinKwon 
AuthorDate: Tue Jul 14 13:28:36 2020 -0700

[SPARK-32300][PYTHON][2.4] toPandas should work from a Spark DataFrame with 
no partitions

### What changes were proposed in this pull request?

This PR proposes to just simply by-pass the case when the number of array 
size is negative, when it collects data from Spark DataFrame with no partitions 
for `toPandas` with Arrow optimization enabled.

```python
spark.sparkContext.emptyRDD().toDF("col1 int").toPandas()
```

In the master and branch-3.0, this was fixed together at 
https://github.com/apache/spark/commit/ecaa495b1fe532c36e952ccac42f4715809476af 
but it's legitimately not ported back.

### Why are the changes needed?

To make empty Spark DataFrame able to be a pandas DataFrame.

### Does this PR introduce _any_ user-facing change?

Yes,

```python
spark.sparkContext.emptyRDD().toDF("col1 int").toPandas()
```

**Before:**

```
...
Caused by: java.lang.NegativeArraySizeException
at 
org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1$$anonfun$apply$17.apply(Dataset.scala:3293)
at 
org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1$$anonfun$apply$17.apply(Dataset.scala:3287)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
...
```

**After:**

```
Empty DataFrame
Columns: [col1]
Index: []
```

### How was this patch tested?

Manually tested and unittest were added.

Closes #29098 from HyukjinKwon/SPARK-32300.

Authored-by: HyukjinKwon 
Signed-off-by: Bryan Cutler 
---
 python/pyspark/sql/tests.py| 6 ++
 sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +-
 2 files changed, 7 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 49acf04..c144b41 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -4579,6 +4579,12 @@ class ArrowTests(ReusedSQLTestCase):
 self.spark.createDataFrame(
 pd.DataFrame({'a': [1, 2, 3]}, index=[2., 3., 
4.])).distinct().count(), 3)
 
+def test_no_partition_toPandas(self):
+# SPARK-32300: toPandas should work from a Spark DataFrame with no 
partitions
+pdf = self.spark.sparkContext.emptyRDD().toDF("col1 int").toPandas()
+self.assertEqual(len(pdf), 0)
+self.assertEqual(list(pdf.columns), ["col1"])
+
 
 @unittest.skipIf(
 not _have_pandas or not _have_pyarrow,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index a755a6f..6e45775 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -3290,7 +3290,7 @@ class Dataset[T] private[sql](
 val numPartitions = arrowBatchRdd.partitions.length
 
 // Store collection results for worst case of 1 to N-1 partitions
-val results = new Array[Array[Array[Byte]]](numPartitions - 1)
+val results = new Array[Array[Array[Byte]]](Math.max(0, numPartitions 
- 1))
 var lastIndex = -1  // index of last partition written
 
 // Handler to eagerly write partitions to Python in order


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated (a4854d6 -> 5084c71)

2020-07-14 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git.


from a4854d6  [MINOR][DOCS] Fix typo in PySpark example in ml-datasource.md
 add 5084c71  [SPARK-32300][PYTHON][2.4] toPandas should work from a Spark 
DataFrame with no partitions

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/tests.py| 6 ++
 sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +-
 2 files changed, 7 insertions(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-32098][PYTHON] Use iloc for positional slicing instead of direct slicing in createDataFrame with Arrow

2020-06-25 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new a295003  [SPARK-32098][PYTHON] Use iloc for positional slicing instead 
of direct slicing in createDataFrame with Arrow
a295003 is described below

commit a295003d48df719fb92c1ee3547dbfd7df85424b
Author: HyukjinKwon 
AuthorDate: Thu Jun 25 11:04:47 2020 -0700

[SPARK-32098][PYTHON] Use iloc for positional slicing instead of direct 
slicing in createDataFrame with Arrow

When you use floats are index of pandas, it creates a Spark DataFrame with 
a wrong results as below when Arrow is enabled:

```bash
./bin/pyspark --conf spark.sql.execution.arrow.pyspark.enabled=true
```

```python
>>> import pandas as pd
>>> spark.createDataFrame(pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 
4.])).show()
+---+
|  a|
+---+
|  1|
|  1|
|  2|
+---+
```

This is because direct slicing uses the value as index when the index 
contains floats:

```python
>>> pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.])[2:]
 a
2.0  1
3.0  2
4.0  3
>>> pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.]).iloc[2:]
 a
4.0  3
>>> pd.DataFrame({'a': [1,2,3]}, index=[2, 3, 4])[2:]
   a
4  3
```

This PR proposes to explicitly use `iloc` to positionally slide when we 
create a DataFrame from a pandas DataFrame with Arrow enabled.

FWIW, I was trying to investigate why direct slicing refers the index value 
or the positional index sometimes but I stopped investigating further after 
reading this 
https://pandas.pydata.org/pandas-docs/stable/getting_started/10min.html#selection

> While standard Python / Numpy expressions for selecting and setting are 
intuitive and come in handy for interactive work, for production code, we 
recommend the optimized pandas data access methods, `.at`, `.iat`, `.loc` and 
`.iloc`.

To create the correct Spark DataFrame from a pandas DataFrame without a 
data loss.

Yes, it is a bug fix.

```bash
./bin/pyspark --conf spark.sql.execution.arrow.pyspark.enabled=true
```
```python
import pandas as pd
spark.createDataFrame(pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 
4.])).show()
```

Before:

```
+---+
|  a|
+---+
|  1|
|  1|
|  2|
+---+
```

After:

```
+---+
|  a|
+---+
|  1|
|  2|
|  3|
+---+
```

Manually tested and unittest were added.

Closes #28928 from HyukjinKwon/SPARK-32098.

Authored-by: HyukjinKwon 
Signed-off-by: Bryan Cutler 
(cherry picked from commit 1af19a7b6836f87a3b34189a8a13b6d21d3a37d8)
Signed-off-by: Bryan Cutler 
---
 python/pyspark/sql/session.py | 2 +-
 python/pyspark/sql/tests.py   | 7 +++
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index fc18f57..bfb0f1c 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -524,7 +524,7 @@ class SparkSession(object):
 
 # Slice the DataFrame to be batched
 step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round 
int up
-pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), 
step))
+pdf_slices = (pdf.iloc[start:start + step] for start in xrange(0, 
len(pdf), step))
 
 # Create Arrow record batches
 batches = [_create_batch([(c, t) for (_, c), t in 
zip(pdf_slice.iteritems(), arrow_types)],
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 1bbf08d..49acf04 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -4572,6 +4572,13 @@ class ArrowTests(ReusedSQLTestCase):
 self.assertPandasEqual(pdf, df_from_python.toPandas())
 self.assertPandasEqual(pdf, df_from_pandas.toPandas())
 
+def test_createDataFrame_with_float_index(self):
+import pandas as pd
+# SPARK-32098: float index should not produce duplicated or truncated 
Spark DataFrame
+self.assertEqual(
+self.spark.createDataFrame(
+pd.DataFrame({'a': [1, 2, 3]}, index=[2., 3., 
4.])).distinct().count(), 3)
+
 
 @unittest.skipIf(
 not _have_pandas or not _have_pyarrow,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-32098][PYTHON] Use iloc for positional slicing instead of direct slicing in createDataFrame with Arrow

2020-06-25 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new a295003  [SPARK-32098][PYTHON] Use iloc for positional slicing instead 
of direct slicing in createDataFrame with Arrow
a295003 is described below

commit a295003d48df719fb92c1ee3547dbfd7df85424b
Author: HyukjinKwon 
AuthorDate: Thu Jun 25 11:04:47 2020 -0700

[SPARK-32098][PYTHON] Use iloc for positional slicing instead of direct 
slicing in createDataFrame with Arrow

When you use floats are index of pandas, it creates a Spark DataFrame with 
a wrong results as below when Arrow is enabled:

```bash
./bin/pyspark --conf spark.sql.execution.arrow.pyspark.enabled=true
```

```python
>>> import pandas as pd
>>> spark.createDataFrame(pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 
4.])).show()
+---+
|  a|
+---+
|  1|
|  1|
|  2|
+---+
```

This is because direct slicing uses the value as index when the index 
contains floats:

```python
>>> pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.])[2:]
 a
2.0  1
3.0  2
4.0  3
>>> pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.]).iloc[2:]
 a
4.0  3
>>> pd.DataFrame({'a': [1,2,3]}, index=[2, 3, 4])[2:]
   a
4  3
```

This PR proposes to explicitly use `iloc` to positionally slide when we 
create a DataFrame from a pandas DataFrame with Arrow enabled.

FWIW, I was trying to investigate why direct slicing refers the index value 
or the positional index sometimes but I stopped investigating further after 
reading this 
https://pandas.pydata.org/pandas-docs/stable/getting_started/10min.html#selection

> While standard Python / Numpy expressions for selecting and setting are 
intuitive and come in handy for interactive work, for production code, we 
recommend the optimized pandas data access methods, `.at`, `.iat`, `.loc` and 
`.iloc`.

To create the correct Spark DataFrame from a pandas DataFrame without a 
data loss.

Yes, it is a bug fix.

```bash
./bin/pyspark --conf spark.sql.execution.arrow.pyspark.enabled=true
```
```python
import pandas as pd
spark.createDataFrame(pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 
4.])).show()
```

Before:

```
+---+
|  a|
+---+
|  1|
|  1|
|  2|
+---+
```

After:

```
+---+
|  a|
+---+
|  1|
|  2|
|  3|
+---+
```

Manually tested and unittest were added.

Closes #28928 from HyukjinKwon/SPARK-32098.

Authored-by: HyukjinKwon 
Signed-off-by: Bryan Cutler 
(cherry picked from commit 1af19a7b6836f87a3b34189a8a13b6d21d3a37d8)
Signed-off-by: Bryan Cutler 
---
 python/pyspark/sql/session.py | 2 +-
 python/pyspark/sql/tests.py   | 7 +++
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index fc18f57..bfb0f1c 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -524,7 +524,7 @@ class SparkSession(object):
 
 # Slice the DataFrame to be batched
 step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round 
int up
-pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), 
step))
+pdf_slices = (pdf.iloc[start:start + step] for start in xrange(0, 
len(pdf), step))
 
 # Create Arrow record batches
 batches = [_create_batch([(c, t) for (_, c), t in 
zip(pdf_slice.iteritems(), arrow_types)],
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 1bbf08d..49acf04 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -4572,6 +4572,13 @@ class ArrowTests(ReusedSQLTestCase):
 self.assertPandasEqual(pdf, df_from_python.toPandas())
 self.assertPandasEqual(pdf, df_from_pandas.toPandas())
 
+def test_createDataFrame_with_float_index(self):
+import pandas as pd
+# SPARK-32098: float index should not produce duplicated or truncated 
Spark DataFrame
+self.assertEqual(
+self.spark.createDataFrame(
+pd.DataFrame({'a': [1, 2, 3]}, index=[2., 3., 
4.])).distinct().count(), 3)
+
 
 @unittest.skipIf(
 not _have_pandas or not _have_pyarrow,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-32098][PYTHON] Use iloc for positional slicing instead of direct slicing in createDataFrame with Arrow

2020-06-25 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new a295003  [SPARK-32098][PYTHON] Use iloc for positional slicing instead 
of direct slicing in createDataFrame with Arrow
a295003 is described below

commit a295003d48df719fb92c1ee3547dbfd7df85424b
Author: HyukjinKwon 
AuthorDate: Thu Jun 25 11:04:47 2020 -0700

[SPARK-32098][PYTHON] Use iloc for positional slicing instead of direct 
slicing in createDataFrame with Arrow

When you use floats are index of pandas, it creates a Spark DataFrame with 
a wrong results as below when Arrow is enabled:

```bash
./bin/pyspark --conf spark.sql.execution.arrow.pyspark.enabled=true
```

```python
>>> import pandas as pd
>>> spark.createDataFrame(pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 
4.])).show()
+---+
|  a|
+---+
|  1|
|  1|
|  2|
+---+
```

This is because direct slicing uses the value as index when the index 
contains floats:

```python
>>> pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.])[2:]
 a
2.0  1
3.0  2
4.0  3
>>> pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.]).iloc[2:]
 a
4.0  3
>>> pd.DataFrame({'a': [1,2,3]}, index=[2, 3, 4])[2:]
   a
4  3
```

This PR proposes to explicitly use `iloc` to positionally slide when we 
create a DataFrame from a pandas DataFrame with Arrow enabled.

FWIW, I was trying to investigate why direct slicing refers the index value 
or the positional index sometimes but I stopped investigating further after 
reading this 
https://pandas.pydata.org/pandas-docs/stable/getting_started/10min.html#selection

> While standard Python / Numpy expressions for selecting and setting are 
intuitive and come in handy for interactive work, for production code, we 
recommend the optimized pandas data access methods, `.at`, `.iat`, `.loc` and 
`.iloc`.

To create the correct Spark DataFrame from a pandas DataFrame without a 
data loss.

Yes, it is a bug fix.

```bash
./bin/pyspark --conf spark.sql.execution.arrow.pyspark.enabled=true
```
```python
import pandas as pd
spark.createDataFrame(pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 
4.])).show()
```

Before:

```
+---+
|  a|
+---+
|  1|
|  1|
|  2|
+---+
```

After:

```
+---+
|  a|
+---+
|  1|
|  2|
|  3|
+---+
```

Manually tested and unittest were added.

Closes #28928 from HyukjinKwon/SPARK-32098.

Authored-by: HyukjinKwon 
Signed-off-by: Bryan Cutler 
(cherry picked from commit 1af19a7b6836f87a3b34189a8a13b6d21d3a37d8)
Signed-off-by: Bryan Cutler 
---
 python/pyspark/sql/session.py | 2 +-
 python/pyspark/sql/tests.py   | 7 +++
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index fc18f57..bfb0f1c 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -524,7 +524,7 @@ class SparkSession(object):
 
 # Slice the DataFrame to be batched
 step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round 
int up
-pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), 
step))
+pdf_slices = (pdf.iloc[start:start + step] for start in xrange(0, 
len(pdf), step))
 
 # Create Arrow record batches
 batches = [_create_batch([(c, t) for (_, c), t in 
zip(pdf_slice.iteritems(), arrow_types)],
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 1bbf08d..49acf04 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -4572,6 +4572,13 @@ class ArrowTests(ReusedSQLTestCase):
 self.assertPandasEqual(pdf, df_from_python.toPandas())
 self.assertPandasEqual(pdf, df_from_pandas.toPandas())
 
+def test_createDataFrame_with_float_index(self):
+import pandas as pd
+# SPARK-32098: float index should not produce duplicated or truncated 
Spark DataFrame
+self.assertEqual(
+self.spark.createDataFrame(
+pd.DataFrame({'a': [1, 2, 3]}, index=[2., 3., 
4.])).distinct().count(), 3)
+
 
 @unittest.skipIf(
 not _have_pandas or not _have_pyarrow,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-32098][PYTHON] Use iloc for positional slicing instead of direct slicing in createDataFrame with Arrow

2020-06-25 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 2d6232a  [SPARK-32098][PYTHON] Use iloc for positional slicing instead 
of direct slicing in createDataFrame with Arrow
2d6232a is described below

commit 2d6232aac476e8afe323fd56d461fadaf2550bb1
Author: HyukjinKwon 
AuthorDate: Thu Jun 25 11:04:47 2020 -0700

[SPARK-32098][PYTHON] Use iloc for positional slicing instead of direct 
slicing in createDataFrame with Arrow

When you use floats are index of pandas, it creates a Spark DataFrame with 
a wrong results as below when Arrow is enabled:

```bash
./bin/pyspark --conf spark.sql.execution.arrow.pyspark.enabled=true
```

```python
>>> import pandas as pd
>>> spark.createDataFrame(pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 
4.])).show()
+---+
|  a|
+---+
|  1|
|  1|
|  2|
+---+
```

This is because direct slicing uses the value as index when the index 
contains floats:

```python
>>> pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.])[2:]
 a
2.0  1
3.0  2
4.0  3
>>> pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.]).iloc[2:]
 a
4.0  3
>>> pd.DataFrame({'a': [1,2,3]}, index=[2, 3, 4])[2:]
   a
4  3
```

This PR proposes to explicitly use `iloc` to positionally slide when we 
create a DataFrame from a pandas DataFrame with Arrow enabled.

FWIW, I was trying to investigate why direct slicing refers the index value 
or the positional index sometimes but I stopped investigating further after 
reading this 
https://pandas.pydata.org/pandas-docs/stable/getting_started/10min.html#selection

> While standard Python / Numpy expressions for selecting and setting are 
intuitive and come in handy for interactive work, for production code, we 
recommend the optimized pandas data access methods, `.at`, `.iat`, `.loc` and 
`.iloc`.

To create the correct Spark DataFrame from a pandas DataFrame without a 
data loss.

Yes, it is a bug fix.

```bash
./bin/pyspark --conf spark.sql.execution.arrow.pyspark.enabled=true
```
```python
import pandas as pd
spark.createDataFrame(pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 
4.])).show()
```

Before:

```
+---+
|  a|
+---+
|  1|
|  1|
|  2|
+---+
```

After:

```
+---+
|  a|
+---+
|  1|
|  2|
|  3|
+---+
```

Manually tested and unittest were added.

Closes #28928 from HyukjinKwon/SPARK-32098.

Authored-by: HyukjinKwon 
Signed-off-by: Bryan Cutler 
(cherry picked from commit 1af19a7b6836f87a3b34189a8a13b6d21d3a37d8)
Signed-off-by: Bryan Cutler 
---
 python/pyspark/sql/pandas/conversion.py | 2 +-
 python/pyspark/sql/tests/test_arrow.py  | 6 ++
 2 files changed, 7 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/pandas/conversion.py 
b/python/pyspark/sql/pandas/conversion.py
index 251625a..e6d8e9f 100644
--- a/python/pyspark/sql/pandas/conversion.py
+++ b/python/pyspark/sql/pandas/conversion.py
@@ -413,7 +413,7 @@ class SparkConversionMixin(object):
 
 # Slice the DataFrame to be batched
 step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round 
int up
-pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), 
step))
+pdf_slices = (pdf.iloc[start:start + step] for start in xrange(0, 
len(pdf), step))
 
 # Create list of Arrow (columns, type) for serializer dump_stream
 arrow_data = [[(c, t) for (_, c), t in zip(pdf_slice.iteritems(), 
arrow_types)]
diff --git a/python/pyspark/sql/tests/test_arrow.py 
b/python/pyspark/sql/tests/test_arrow.py
index 004c79f..1386f8d 100644
--- a/python/pyspark/sql/tests/test_arrow.py
+++ b/python/pyspark/sql/tests/test_arrow.py
@@ -415,6 +415,12 @@ class ArrowTests(ReusedSQLTestCase):
 for case in cases:
 run_test(*case)
 
+def test_createDataFrame_with_float_index(self):
+# SPARK-32098: float index should not produce duplicated or truncated 
Spark DataFrame
+self.assertEqual(
+self.spark.createDataFrame(
+pd.DataFrame({'a': [1, 2, 3]}, index=[2., 3., 
4.])).distinct().count(), 3)
+
 
 @unittest.skipIf(
 not have_pandas or not have_pyarrow,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (d06604f -> 1af19a7)

2020-06-25 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from d06604f  [SPARK-32078][DOC] Add a redirect to sql-ref from 
sql-reference
 add 1af19a7  [SPARK-32098][PYTHON] Use iloc for positional slicing instead 
of direct slicing in createDataFrame with Arrow

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/pandas/conversion.py | 2 +-
 python/pyspark/sql/tests/test_arrow.py  | 6 ++
 2 files changed, 7 insertions(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (d06604f -> 1af19a7)

2020-06-25 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from d06604f  [SPARK-32078][DOC] Add a redirect to sql-ref from 
sql-reference
 add 1af19a7  [SPARK-32098][PYTHON] Use iloc for positional slicing instead 
of direct slicing in createDataFrame with Arrow

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/pandas/conversion.py | 2 +-
 python/pyspark/sql/tests/test_arrow.py  | 6 ++
 2 files changed, 7 insertions(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (d06604f -> 1af19a7)

2020-06-25 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from d06604f  [SPARK-32078][DOC] Add a redirect to sql-ref from 
sql-reference
 add 1af19a7  [SPARK-32098][PYTHON] Use iloc for positional slicing instead 
of direct slicing in createDataFrame with Arrow

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/pandas/conversion.py | 2 +-
 python/pyspark/sql/tests/test_arrow.py  | 6 ++
 2 files changed, 7 insertions(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-31915][SQL][PYTHON] Resolve the grouping column properly per the case sensitivity in grouped and cogrouped pandas UDFs

2020-06-10 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 00d06ca  [SPARK-31915][SQL][PYTHON] Resolve the grouping column 
properly per the case sensitivity in grouped and cogrouped pandas UDFs
00d06ca is described below

commit 00d06cad564d5e3e5f78a687776d02fe0695a861
Author: HyukjinKwon 
AuthorDate: Wed Jun 10 15:54:07 2020 -0700

[SPARK-31915][SQL][PYTHON] Resolve the grouping column properly per the 
case sensitivity in grouped and cogrouped pandas UDFs

### What changes were proposed in this pull request?

This is another approach to fix the issue. See the previous try 
https://github.com/apache/spark/pull/28745. It was too invasive so I took more 
conservative approach.

This PR proposes to resolve grouping attributes separately first so it can 
be properly referred when `FlatMapGroupsInPandas` and `FlatMapCoGroupsInPandas` 
are resolved without ambiguity.

Previously,

```python
from pyspark.sql.functions import *
df = spark.createDataFrame([[1, 1]], ["column", "Score"])
pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
return pdf.assign(Score=0.5)

df.groupby('COLUMN').apply(my_pandas_udf).show()
```

was failed as below:

```
pyspark.sql.utils.AnalysisException: "Reference 'COLUMN' is ambiguous, 
could be: COLUMN, COLUMN.;"
```
because the unresolved `COLUMN` in `FlatMapGroupsInPandas` doesn't know 
which reference to take from the child projection.

After this fix, it resolves the child projection first with grouping keys 
and pass, to `FlatMapGroupsInPandas`, the attribute as a grouping key from the 
child projection that is positionally selected.

### Why are the changes needed?

To resolve grouping keys correctly.

### Does this PR introduce _any_ user-facing change?

Yes,

```python
from pyspark.sql.functions import *
df = spark.createDataFrame([[1, 1]], ["column", "Score"])
pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
return pdf.assign(Score=0.5)

df.groupby('COLUMN').apply(my_pandas_udf).show()
```

```python
df1 = spark.createDataFrame([(1, 1)], ("column", "value"))
df2 = spark.createDataFrame([(1, 1)], ("column", "value"))

df1.groupby("COLUMN").cogroup(
df2.groupby("COLUMN")
).applyInPandas(lambda r, l: r + l, df1.schema).show()
```

Before:

```
pyspark.sql.utils.AnalysisException: Reference 'COLUMN' is ambiguous, could 
be: COLUMN, COLUMN.;
```

```
pyspark.sql.utils.AnalysisException: cannot resolve '`COLUMN`' given input 
columns: [COLUMN, COLUMN, value, value];;
'FlatMapCoGroupsInPandas ['COLUMN], ['COLUMN], (column#9L, 
value#10L, column#13L, value#14L), [column#22L, value#23L]
:- Project [COLUMN#9L, column#9L, value#10L]
:  +- LogicalRDD [column#9L, value#10L], false
+- Project [COLUMN#13L, column#13L, value#14L]
   +- LogicalRDD [column#13L, value#14L], false
```

After:

```
+--+-+
|column|Score|
+--+-+
| 1|  0.5|
+--+-+
```

```
+--+-+
|column|value|
+--+-+
| 2|2|
+--+-+
```

### How was this patch tested?

Unittests were added and manually tested.

Closes #28777 from HyukjinKwon/SPARK-31915-another.

Authored-by: HyukjinKwon 
Signed-off-by: Bryan Cutler 
---
 python/pyspark/sql/tests/test_pandas_cogrouped_map.py  | 18 +-
 python/pyspark/sql/tests/test_pandas_grouped_map.py| 10 ++
 .../apache/spark/sql/RelationalGroupedDataset.scala| 17 ++---
 3 files changed, 37 insertions(+), 8 deletions(-)

diff --git a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py 
b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
index 3ed9d2a..c1cb30c 100644
--- a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
+++ b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
@@ -19,7 +19,7 @@ import unittest
 import sys
 
 from pyspark.sql.functions import array, explode, col, lit, udf, sum, 
pandas_udf, PandasUDFType
-from pyspark.sql.types import DoubleType, StructType, StructField
+from pyspark.sql.types import DoubleType, StructType, StructField, Row
 from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, 
have_pyarrow, \
 pandas_requirement_message, pyarrow_requirement_message
 from pyspark.testing.utils import QuietTest
@@ -193,6 +193,22 @@ class CogroupedMapInPan

[spark] branch master updated: [SPARK-31915][SQL][PYTHON] Resolve the grouping column properly per the case sensitivity in grouped and cogrouped pandas UDFs

2020-06-10 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 00d06ca  [SPARK-31915][SQL][PYTHON] Resolve the grouping column 
properly per the case sensitivity in grouped and cogrouped pandas UDFs
00d06ca is described below

commit 00d06cad564d5e3e5f78a687776d02fe0695a861
Author: HyukjinKwon 
AuthorDate: Wed Jun 10 15:54:07 2020 -0700

[SPARK-31915][SQL][PYTHON] Resolve the grouping column properly per the 
case sensitivity in grouped and cogrouped pandas UDFs

### What changes were proposed in this pull request?

This is another approach to fix the issue. See the previous try 
https://github.com/apache/spark/pull/28745. It was too invasive so I took more 
conservative approach.

This PR proposes to resolve grouping attributes separately first so it can 
be properly referred when `FlatMapGroupsInPandas` and `FlatMapCoGroupsInPandas` 
are resolved without ambiguity.

Previously,

```python
from pyspark.sql.functions import *
df = spark.createDataFrame([[1, 1]], ["column", "Score"])
pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
return pdf.assign(Score=0.5)

df.groupby('COLUMN').apply(my_pandas_udf).show()
```

was failed as below:

```
pyspark.sql.utils.AnalysisException: "Reference 'COLUMN' is ambiguous, 
could be: COLUMN, COLUMN.;"
```
because the unresolved `COLUMN` in `FlatMapGroupsInPandas` doesn't know 
which reference to take from the child projection.

After this fix, it resolves the child projection first with grouping keys 
and pass, to `FlatMapGroupsInPandas`, the attribute as a grouping key from the 
child projection that is positionally selected.

### Why are the changes needed?

To resolve grouping keys correctly.

### Does this PR introduce _any_ user-facing change?

Yes,

```python
from pyspark.sql.functions import *
df = spark.createDataFrame([[1, 1]], ["column", "Score"])
pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
return pdf.assign(Score=0.5)

df.groupby('COLUMN').apply(my_pandas_udf).show()
```

```python
df1 = spark.createDataFrame([(1, 1)], ("column", "value"))
df2 = spark.createDataFrame([(1, 1)], ("column", "value"))

df1.groupby("COLUMN").cogroup(
df2.groupby("COLUMN")
).applyInPandas(lambda r, l: r + l, df1.schema).show()
```

Before:

```
pyspark.sql.utils.AnalysisException: Reference 'COLUMN' is ambiguous, could 
be: COLUMN, COLUMN.;
```

```
pyspark.sql.utils.AnalysisException: cannot resolve '`COLUMN`' given input 
columns: [COLUMN, COLUMN, value, value];;
'FlatMapCoGroupsInPandas ['COLUMN], ['COLUMN], (column#9L, 
value#10L, column#13L, value#14L), [column#22L, value#23L]
:- Project [COLUMN#9L, column#9L, value#10L]
:  +- LogicalRDD [column#9L, value#10L], false
+- Project [COLUMN#13L, column#13L, value#14L]
   +- LogicalRDD [column#13L, value#14L], false
```

After:

```
+--+-+
|column|Score|
+--+-+
| 1|  0.5|
+--+-+
```

```
+--+-+
|column|value|
+--+-+
| 2|2|
+--+-+
```

### How was this patch tested?

Unittests were added and manually tested.

Closes #28777 from HyukjinKwon/SPARK-31915-another.

Authored-by: HyukjinKwon 
Signed-off-by: Bryan Cutler 
---
 python/pyspark/sql/tests/test_pandas_cogrouped_map.py  | 18 +-
 python/pyspark/sql/tests/test_pandas_grouped_map.py| 10 ++
 .../apache/spark/sql/RelationalGroupedDataset.scala| 17 ++---
 3 files changed, 37 insertions(+), 8 deletions(-)

diff --git a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py 
b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
index 3ed9d2a..c1cb30c 100644
--- a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
+++ b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
@@ -19,7 +19,7 @@ import unittest
 import sys
 
 from pyspark.sql.functions import array, explode, col, lit, udf, sum, 
pandas_udf, PandasUDFType
-from pyspark.sql.types import DoubleType, StructType, StructField
+from pyspark.sql.types import DoubleType, StructType, StructField, Row
 from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, 
have_pyarrow, \
 pandas_requirement_message, pyarrow_requirement_message
 from pyspark.testing.utils import QuietTest
@@ -193,6 +193,22 @@ class CogroupedMapInPan

[spark] branch master updated: [SPARK-25351][SQL][PYTHON] Handle Pandas category type when converting from Python with Arrow

2020-05-27 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 339b0eca [SPARK-25351][SQL][PYTHON] Handle Pandas category type when 
converting from Python with Arrow
339b0eca is described below

commit 339b0ecadb9c66ec8a62fd1f8e5a7a266b465aef
Author: Jalpan Randeri 
AuthorDate: Wed May 27 17:27:29 2020 -0700

[SPARK-25351][SQL][PYTHON] Handle Pandas category type when converting from 
Python with Arrow

Handle Pandas category type while converting from python with Arrow 
enabled. The category column will be converted to whatever type the category 
elements are as is the case with Arrow disabled.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
New unit tests were added for `createDataFrame` and scalar `pandas_udf`

Closes #26585 from jalpan-randeri/feature-pyarrow-dictionary-type.

Authored-by: Jalpan Randeri 
Signed-off-by: Bryan Cutler 
---
 python/pyspark/sql/pandas/serializers.py   |  3 +++
 python/pyspark/sql/pandas/types.py |  2 ++
 python/pyspark/sql/tests/test_arrow.py | 26 ++
 python/pyspark/sql/tests/test_pandas_udf_scalar.py | 21 +
 4 files changed, 52 insertions(+)

diff --git a/python/pyspark/sql/pandas/serializers.py 
b/python/pyspark/sql/pandas/serializers.py
index 4dd15d1..ff0b10a 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -154,6 +154,9 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
 # Ensure timestamp series are in expected form for Spark internal 
representation
 if t is not None and pa.types.is_timestamp(t):
 s = _check_series_convert_timestamps_internal(s, 
self._timezone)
+elif type(s.dtype) == pd.CategoricalDtype:
+# Note: This can be removed once minimum pyarrow version is >= 
0.16.1
+s = s.astype(s.dtypes.categories.dtype)
 try:
 array = pa.Array.from_pandas(s, mask=mask, type=t, 
safe=self._safecheck)
 except pa.ArrowException as e:
diff --git a/python/pyspark/sql/pandas/types.py 
b/python/pyspark/sql/pandas/types.py
index d1edf3f..4b70c8a 100644
--- a/python/pyspark/sql/pandas/types.py
+++ b/python/pyspark/sql/pandas/types.py
@@ -114,6 +114,8 @@ def from_arrow_type(at):
 return StructType(
 [StructField(field.name, from_arrow_type(field.type), 
nullable=field.nullable)
  for field in at])
+elif types.is_dictionary(at):
+spark_type = from_arrow_type(at.value_type)
 else:
 raise TypeError("Unsupported type in conversion from Arrow: " + 
str(at))
 return spark_type
diff --git a/python/pyspark/sql/tests/test_arrow.py 
b/python/pyspark/sql/tests/test_arrow.py
index 004c79f..c3c9fb0 100644
--- a/python/pyspark/sql/tests/test_arrow.py
+++ b/python/pyspark/sql/tests/test_arrow.py
@@ -415,6 +415,32 @@ class ArrowTests(ReusedSQLTestCase):
 for case in cases:
 run_test(*case)
 
+def test_createDateFrame_with_category_type(self):
+pdf = pd.DataFrame({"A": [u"a", u"b", u"c", u"a"]})
+pdf["B"] = pdf["A"].astype('category')
+category_first_element = dict(enumerate(pdf['B'].cat.categories))[0]
+
+with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": 
True}):
+arrow_df = self.spark.createDataFrame(pdf)
+arrow_type = arrow_df.dtypes[1][1]
+result_arrow = arrow_df.toPandas()
+arrow_first_category_element = result_arrow["B"][0]
+
+with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": 
False}):
+df = self.spark.createDataFrame(pdf)
+spark_type = df.dtypes[1][1]
+result_spark = df.toPandas()
+spark_first_category_element = result_spark["B"][0]
+
+assert_frame_equal(result_spark, result_arrow)
+
+# ensure original category elements are string
+assert isinstance(category_first_element, str)
+# spark data frame and arrow execution mode enabled data frame type 
must match pandas
+assert spark_type == arrow_type == 'string'
+assert isinstance(arrow_first_category_element, str)
+assert isinstance(spark_first_category_element, str)
+
 
 @unittest.skipIf(
 not have_pandas or not have_pyarrow,
diff --git a/python/pyspark/sql/tests/test_pandas_udf_scalar.py 
b/python/pyspark/sql/tests/test_pandas_udf_scalar.py
index 7260e80..ae6b8d5 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_scalar.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_scalar

[spark] branch master updated (d0800fc -> 43d9c7e)

2020-01-26 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from d0800fc  [SPARK-30314] Add identifier and catalog information to 
DataSourceV2Relation
 add 43d9c7e  [SPARK-30640][PYTHON][SQL] Prevent unnecessary copies of data 
during Arrow to Pandas conversion

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/pandas/conversion.py  |  8 ++--
 python/pyspark/sql/pandas/serializers.py |  7 +--
 python/pyspark/sql/pandas/types.py   | 16 
 3 files changed, 11 insertions(+), 20 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (d0800fc -> 43d9c7e)

2020-01-26 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from d0800fc  [SPARK-30314] Add identifier and catalog information to 
DataSourceV2Relation
 add 43d9c7e  [SPARK-30640][PYTHON][SQL] Prevent unnecessary copies of data 
during Arrow to Pandas conversion

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/pandas/conversion.py  |  8 ++--
 python/pyspark/sql/pandas/serializers.py |  7 +--
 python/pyspark/sql/pandas/types.py   | 16 
 3 files changed, 11 insertions(+), 20 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (b5bc3e1 -> f372d1c)

2020-01-10 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from b5bc3e1  [SPARK-30312][SQL] Preserve path permission and acl when 
truncate table
 add f372d1c  [SPARK-29748][PYTHON][SQL] Remove Row field sorting in 
PySpark for version 3.6+

No new revisions were added by this update.

Summary of changes:
 docs/pyspark-migration-guide.md|  2 ++
 python/pyspark/sql/tests/test_types.py | 13 
 python/pyspark/sql/types.py| 56 +++---
 python/run-tests.py|  3 +-
 4 files changed, 62 insertions(+), 12 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (3d2a6f4 -> e804ed5)

2019-11-19 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 3d2a6f4  [SPARK-29906][SQL] AQE should not introduce extra shuffle for 
outermost limit
 add e804ed5  [SPARK-29691][ML][PYTHON] ensure Param objects are valid in 
fit, transform

No new revisions were added by this update.

Summary of changes:
 python/pyspark/ml/param/__init__.py| 12 ++--
 python/pyspark/ml/tests/test_param.py  |  4 
 python/pyspark/ml/tests/test_tuning.py |  9 +
 python/pyspark/ml/tuning.py|  8 +++-
 4 files changed, 30 insertions(+), 3 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (3d2a6f4 -> e804ed5)

2019-11-19 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 3d2a6f4  [SPARK-29906][SQL] AQE should not introduce extra shuffle for 
outermost limit
 add e804ed5  [SPARK-29691][ML][PYTHON] ensure Param objects are valid in 
fit, transform

No new revisions were added by this update.

Summary of changes:
 python/pyspark/ml/param/__init__.py| 12 ++--
 python/pyspark/ml/tests/test_param.py  |  4 
 python/pyspark/ml/tests/test_tuning.py |  9 +
 python/pyspark/ml/tuning.py|  8 +++-
 4 files changed, 30 insertions(+), 3 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (0bdadba -> 7fc9db0)

2019-11-08 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 0bdadba  [SPARK-29790][DOC] Note required port for Kube API
 add 7fc9db0  [SPARK-29798][PYTHON][SQL] Infers bytes as binary type in 
createDataFrame in Python 3 at PySpark

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/tests/test_types.py | 16 
 python/pyspark/sql/types.py|  5 +
 2 files changed, 21 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (00347a3 -> 901ff92)

2019-10-17 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 00347a3  [SPARK-28762][CORE] Read JAR main class if JAR is not located 
in local file system
 add 901ff92  [SPARK-29464][PYTHON][ML] PySpark ML should expose 
Params.clear() to unset a user supplied Param

No new revisions were added by this update.

Summary of changes:
 python/pyspark/ml/classification.py   |  4 ++--
 python/pyspark/ml/param/__init__.py   |  2 +-
 python/pyspark/ml/tests/test_param.py | 20 ++--
 python/pyspark/ml/wrapper.py  |  8 
 python/pyspark/testing/mlutils.py |  2 +-
 5 files changed, 30 insertions(+), 6 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (6390f02 -> beb8d2f)

2019-10-11 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 6390f02  [SPARK-29367][DOC] Add compatibility note for Arrow 0.15.0 to 
SQL guide
 add beb8d2f  [SPARK-29402][PYTHON][TESTS] Added tests for grouped map 
pandas_udf with window

No new revisions were added by this update.

Summary of changes:
 .../sql/tests/test_pandas_udf_grouped_map.py   | 79 +-
 1 file changed, 78 insertions(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (6390f02 -> beb8d2f)

2019-10-11 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 6390f02  [SPARK-29367][DOC] Add compatibility note for Arrow 0.15.0 to 
SQL guide
 add beb8d2f  [SPARK-29402][PYTHON][TESTS] Added tests for grouped map 
pandas_udf with window

No new revisions were added by this update.

Summary of changes:
 .../sql/tests/test_pandas_udf_grouped_map.py   | 79 +-
 1 file changed, 78 insertions(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (197732e -> 05988b2)

2019-09-17 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 197732e  [SPARK-29125][INFRA] Add Hadoop 2.7 combination to GitHub 
Action
 add 05988b2  [SPARK-27463][PYTHON] Support Dataframe Cogroup via Pandas 
UDFs

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/api/python/PythonRunner.scala |   2 +
 python/pyspark/rdd.py  |   1 +
 python/pyspark/serializers.py  |  26 ++
 python/pyspark/sql/cogroup.py  |  98 
 python/pyspark/sql/functions.py|   5 +-
 python/pyspark/sql/group.py|  12 +-
 .../sql/tests/test_pandas_udf_cogrouped_map.py | 280 +
 python/pyspark/sql/udf.py  |  19 ++
 python/pyspark/worker.py   |  98 +++-
 .../spark/sql/catalyst/analysis/Analyzer.scala |   6 +
 .../plans/logical/pythonLogicalOperators.scala |  19 +-
 .../spark/sql/RelationalGroupedDataset.scala   |  46 +++-
 .../spark/sql/execution/SparkStrategies.scala  |   3 +
 .../sql/execution/python/ArrowPythonRunner.scala   |  76 +-
 ...honRunner.scala => BaseArrowPythonRunner.scala} |  92 +--
 .../sql/execution/python/BasePandasGroupExec.scala | 137 ++
 .../python/CogroupedArrowPythonRunner.scala| 113 +
 .../python/FlatMapCoGroupsInPandasExec.scala   |  97 +++
 .../python/FlatMapGroupsInPandasExec.scala |  98 +---
 19 files changed, 965 insertions(+), 263 deletions(-)
 create mode 100644 python/pyspark/sql/cogroup.py
 create mode 100644 python/pyspark/sql/tests/test_pandas_udf_cogrouped_map.py
 copy 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/{ArrowPythonRunner.scala
 => BaseArrowPythonRunner.scala} (51%)
 create mode 100644 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BasePandasGroupExec.scala
 create mode 100644 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/CogroupedArrowPythonRunner.scala
 create mode 100644 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInPandasExec.scala


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (d25cbd4 -> 573b1cb)

2019-08-23 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from d25cbd4  [SPARK-28839][CORE] Avoids NPE in context cleaner when 
dynamic allocation and shuffle service are on
 add 573b1cb  [SPARK-28858][ML][PYSPARK] add tree-based transformation in 
the py side

No new revisions were added by this update.

Summary of changes:
 python/pyspark/ml/classification.py | 54 +---
 python/pyspark/ml/regression.py | 71 ++---
 2 files changed, 84 insertions(+), 41 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-28323][SQL][PYTHON] PythonUDF should be able to use in join condition

2019-07-10 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7858e53  [SPARK-28323][SQL][PYTHON] PythonUDF should be able to use in 
join condition
7858e53 is described below

commit 7858e534d3195d532874a3d90121353895ba3f42
Author: Liang-Chi Hsieh 
AuthorDate: Wed Jul 10 16:29:58 2019 -0700

[SPARK-28323][SQL][PYTHON] PythonUDF should be able to use in join condition

## What changes were proposed in this pull request?

There is a bug in `ExtractPythonUDFs` that produces wrong result 
attributes. It causes a failure when using `PythonUDF`s among multiple child 
plans, e.g., join. An example is using `PythonUDF`s in join condition.

```python
>>> left = spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, 
a2=2)])
>>> right = spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=1, b1=3, 
b2=1)])
>>> f = udf(lambda a: a, IntegerType())
>>> df = left.join(right, [f("a") == f("b"), left.a1 == right.b1])
>>> df.collect()
19/07/10 12:20:49 ERROR Executor: Exception in task 5.0 in stage 0.0 (TID 5)
java.lang.ArrayIndexOutOfBoundsException: 1
at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.genericGet(rows.scala:201)
at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getAs(rows.scala:35)
at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.isNullAt(rows.scala:36)
at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.isNullAt$(rows.scala:36)
at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.isNullAt(rows.scala:195)
at 
org.apache.spark.sql.catalyst.expressions.JoinedRow.isNullAt(JoinedRow.scala:70)
...
```

## How was this patch tested?

Added test.

Closes #25091 from viirya/SPARK-28323.

Authored-by: Liang-Chi Hsieh 
Signed-off-by: Bryan Cutler 
---
 python/pyspark/sql/tests/test_udf.py   | 10 +
 .../sql/execution/python/ExtractPythonUDFs.scala   |  2 +-
 .../scala/org/apache/spark/sql/JoinSuite.scala | 25 ++
 3 files changed, 36 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/tests/test_udf.py 
b/python/pyspark/sql/tests/test_udf.py
index 0dafa18..803d471 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -197,6 +197,8 @@ class UDFTests(ReusedSQLTestCase):
 left = self.spark.createDataFrame([Row(a=1)])
 right = self.spark.createDataFrame([Row(b=1)])
 f = udf(lambda a, b: a == b, BooleanType())
+# The udf uses attributes from both sides of join, so it is pulled out 
as Filter +
+# Cross join.
 df = left.join(right, f("a", "b"))
 with self.assertRaisesRegexp(AnalysisException, 'Detected implicit 
cartesian product'):
 df.collect()
@@ -243,6 +245,14 @@ class UDFTests(ReusedSQLTestCase):
 runWithJoinType("leftanti", "LeftAnti")
 runWithJoinType("leftsemi", "LeftSemi")
 
+def test_udf_as_join_condition(self):
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=1, 
b1=3, b2=1)])
+f = udf(lambda a: a, IntegerType())
+
+df = left.join(right, [f("a") == f("b"), left.a1 == right.b1])
+self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1, b=1, b1=1, b2=1)])
+
 def test_udf_without_arguments(self):
 self.spark.catalog.registerFunction("foo", lambda: "bar")
 [row] = self.spark.sql("SELECT foo()").collect()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
index 58fe7d5..fc4ded3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
@@ -179,7 +179,7 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with 
PredicateHelper {
 validUdfs.forall(PythonUDF.isScalarPythonUDF),
 "Can only extract scalar vectorized udf or sql batch udf")
 
-  val resultAttrs = udfs.zipWithIndex.map { case (u, i) =>
+  val resultAttrs = validUdfs.zipWithIndex.map { case (u, i) =>
 AttributeReference(s"pythonUDF$i", u.dataType)()
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala

[spark] branch master updated: [SPARK-27992][PYTHON] Allow Python to join with connection thread to propagate errors

2019-06-26 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c277afb  [SPARK-27992][PYTHON] Allow Python to join with connection 
thread to propagate errors
c277afb is described below

commit c277afb12b61a91272568dd46380c0d0a9958989
Author: Bryan Cutler 
AuthorDate: Wed Jun 26 13:05:41 2019 -0700

[SPARK-27992][PYTHON] Allow Python to join with connection thread to 
propagate errors

## What changes were proposed in this pull request?

Currently with `toLocalIterator()` and `toPandas()` with Arrow enabled, if 
the Spark job being run in the background serving thread errors, it will be 
caught and sent to Python through the PySpark serializer.
This is not the ideal solution because it is only catch a SparkException, 
it won't handle an error that occurs in the serializer, and each method has to 
have it's own special handling to propagate the error.

This PR instead returns the Python Server object along with the serving 
port and authentication info, so that it allows the Python caller to join with 
the serving thread. During the call to join, the serving thread Future is 
completed either successfully or with an exception. In the latter case, the 
exception will be propagated to Python through the Py4j call.

## How was this patch tested?

Existing tests

Closes #24834 from BryanCutler/pyspark-propagate-server-error-SPARK-27992.

Authored-by: Bryan Cutler 
Signed-off-by: Bryan Cutler 
---
 .../org/apache/spark/api/python/PythonRDD.scala| 90 +++--
 .../main/scala/org/apache/spark/api/r/RRDD.scala   |  4 +-
 .../apache/spark/security/SocketAuthHelper.scala   | 19 +
 .../apache/spark/security/SocketAuthServer.scala   | 94 +++---
 .../main/scala/org/apache/spark/util/Utils.scala   |  4 +-
 python/pyspark/rdd.py  | 26 --
 python/pyspark/sql/dataframe.py| 10 ++-
 python/pyspark/sql/tests/test_arrow.py |  2 +-
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 38 -
 9 files changed, 161 insertions(+), 126 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index fe25c3a..5b80e14 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -38,7 +38,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.BUFFER_SIZE
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
-import org.apache.spark.security.{SocketAuthHelper, SocketAuthServer}
+import org.apache.spark.security.{SocketAuthHelper, SocketAuthServer, 
SocketFuncServer}
 import org.apache.spark.util._
 
 
@@ -137,8 +137,9 @@ private[spark] object PythonRDD extends Logging {
* (effectively a collect()), but allows you to run on a certain subset of 
partitions,
* or to enable local execution.
*
-   * @return 2-tuple (as a Java array) with the port number of a local socket 
which serves the
-   * data collected from this job, and the secret for authentication.
+   * @return 3-tuple (as a Java array) with the port number of a local socket 
which serves the
+   * data collected from this job, the secret for authentication, and 
a socket auth
+   * server object that can be used to join the JVM serving thread in 
Python.
*/
   def runJob(
   sc: SparkContext,
@@ -156,8 +157,9 @@ private[spark] object PythonRDD extends Logging {
   /**
* A helper function to collect an RDD as an iterator, then serve it via 
socket.
*
-   * @return 2-tuple (as a Java array) with the port number of a local socket 
which serves the
-   * data collected from this job, and the secret for authentication.
+   * @return 3-tuple (as a Java array) with the port number of a local socket 
which serves the
+   * data collected from this job, the secret for authentication, and 
a socket auth
+   * server object that can be used to join the JVM serving thread in 
Python.
*/
   def collectAndServe[T](rdd: RDD[T]): Array[Any] = {
 serveIterator(rdd.collect().iterator, s"serve RDD ${rdd.id}")
@@ -168,58 +170,59 @@ private[spark] object PythonRDD extends Logging {
* are collected as separate jobs, by order of index. Partition data is 
first requested by a
* non-zero integer to start a collection job. The response is prefaced by 
an integer with 1
* meaning partition data will be served, 0 meaning the local iterator has 
been consumed,
-   * and -1 meaining an error occurred during collection. This function is 
used by
+   * and -1 meaning an error occurred during collection. Thi

[spark] branch master updated (9df7587 -> d0fbc4d)

2019-06-24 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 9df7587  [MINOR][CORE] Fix line too long in TransportClientFactory
 add d0fbc4d  [SPARK-28003][PYTHON] Allow NaT values when creating Spark 
dataframe from pandas with Arrow

No new revisions were added by this update.

Summary of changes:
 python/pyspark/serializers.py  | 3 +--
 python/pyspark/sql/tests/test_arrow.py | 9 +
 2 files changed, 10 insertions(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (9b9d81b -> 113f8c8)

2019-06-21 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 9b9d81b  [SPARK-28131][PYTHON] Update document type conversion between 
Python data and SQL types in normal UDFs (Python 3.7)
 add 113f8c8  [SPARK-28132][PYTHON] Update document type conversion for 
Pandas UDFs (pyarrow 0.13.0, pandas 0.24.2, Python 3.7)

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/functions.py | 47 +++--
 1 file changed, 22 insertions(+), 25 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (54da3bb -> 9b9d81b)

2019-06-21 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 54da3bb  [SPARK-28127][SQL] Micro optimization on TreeNode's 
mapChildren method
 add 9b9d81b  [SPARK-28131][PYTHON] Update document type conversion between 
Python data and SQL types in normal UDFs (Python 3.7)

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/functions.py | 43 -
 1 file changed, 21 insertions(+), 22 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-27805][PYTHON] Propagate SparkExceptions during toPandas with arrow enabled

2019-06-04 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f9ca8ab  [SPARK-27805][PYTHON] Propagate SparkExceptions during 
toPandas with arrow enabled
f9ca8ab is described below

commit f9ca8ab196b1967a7603ca36d62fc15d1391842e
Author: David Vogelbacher 
AuthorDate: Tue Jun 4 10:10:27 2019 -0700

[SPARK-27805][PYTHON] Propagate SparkExceptions during toPandas with arrow 
enabled

## What changes were proposed in this pull request?
Similar to https://github.com/apache/spark/pull/24070, we now propagate 
SparkExceptions that are encountered during the collect in the java process to 
the python process.

Fixes https://jira.apache.org/jira/browse/SPARK-27805

## How was this patch tested?
Added a new unit test

Closes #24677 from dvogelbacher/dv/betterErrorMsgWhenUsingArrow.

Authored-by: David Vogelbacher 
Signed-off-by: Bryan Cutler 
---
 python/pyspark/serializers.py  |  6 +++-
 python/pyspark/sql/tests/test_arrow.py | 12 +++
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 40 +++---
 3 files changed, 44 insertions(+), 14 deletions(-)

diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 6058e94..516ee7e 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -206,8 +206,12 @@ class ArrowCollectSerializer(Serializer):
 for batch in self.serializer.load_stream(stream):
 yield batch
 
-# load the batch order indices
+# load the batch order indices or propagate any error that occurred in 
the JVM
 num = read_int(stream)
+if num == -1:
+error_msg = UTF8Deserializer().loads(stream)
+raise RuntimeError("An error occurred while calling "
+   "ArrowCollectSerializer.load_stream: 
{}".format(error_msg))
 batch_order = []
 for i in xrange(num):
 index = read_int(stream)
diff --git a/python/pyspark/sql/tests/test_arrow.py 
b/python/pyspark/sql/tests/test_arrow.py
index 7871af4..cb51241 100644
--- a/python/pyspark/sql/tests/test_arrow.py
+++ b/python/pyspark/sql/tests/test_arrow.py
@@ -23,6 +23,7 @@ import unittest
 import warnings
 
 from pyspark.sql import Row
+from pyspark.sql.functions import udf
 from pyspark.sql.types import *
 from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, 
have_pyarrow, \
 pandas_requirement_message, pyarrow_requirement_message
@@ -205,6 +206,17 @@ class ArrowTests(ReusedSQLTestCase):
 self.assertEqual(pdf.columns[0], "field1")
 self.assertTrue(pdf.empty)
 
+def test_propagates_spark_exception(self):
+df = self.spark.range(3).toDF("i")
+
+def raise_exception():
+raise Exception("My error")
+exception_udf = udf(raise_exception, IntegerType())
+df = df.withColumn("error", exception_udf())
+with QuietTest(self.sc):
+with self.assertRaisesRegexp(RuntimeError, 'My error'):
+df.toPandas()
+
 def _createDataFrame_toggle(self, pdf, schema=None):
 with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": 
False}):
 df_no_arrow = self.spark.createDataFrame(pdf, schema=schema)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index f3377f3..a80aade 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -26,7 +26,7 @@ import scala.util.control.NonFatal
 
 import org.apache.commons.lang3.StringUtils
 
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.annotation.{DeveloperApi, Evolving, Experimental, 
Stable, Unstable}
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.api.java.function._
@@ -3321,20 +3321,34 @@ class Dataset[T] private[sql](
 }
   }
 
-val arrowBatchRdd = toArrowBatchRdd(plan)
-sparkSession.sparkContext.runJob(
-  arrowBatchRdd,
-  (it: Iterator[Array[Byte]]) => it.toArray,
-  handlePartitionBatches)
+var sparkException: Option[SparkException] = None
+try {
+  val arrowBatchRdd = toArrowBatchRdd(plan)
+  sparkSession.sparkContext.runJob(
+arrowBatchRdd,
+(it: Iterator[Array[Byte]]) => it.toArray,
+handlePartitionBatches)
+} catch {
+  case e: SparkException =>
+sparkException = Some(e)
+}
 
-// After processing all partitions, end the stream and write batch 

[spark] branch master updated (2f55809 -> 5e79ae3)

2019-05-07 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 2f55809  [SPARK-27294][SS] Add multi-cluster Kafka delegation token
 add 5e79ae3  [SPARK-23961][SPARK-27548][PYTHON] Fix error when 
toLocalIterator goes out of scope and properly raise errors from worker

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/api/python/PythonRDD.scala| 57 ++-
 python/pyspark/rdd.py  | 66 --
 python/pyspark/sql/dataframe.py|  4 +-
 python/pyspark/sql/tests/test_dataframe.py | 28 +
 python/pyspark/tests/test_rdd.py   | 31 +-
 5 files changed, 174 insertions(+), 12 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (8375103 -> 9623420)

2019-05-01 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 8375103  [SPARK-27557][DOC] Add copy button to Python API docs for 
easier copying of code-blocks
 add 9623420  [SPARK-27276][PYTHON][DOCS][FOLLOW-UP] Update documentation 
about Arrow version in PySpark as well

No new revisions were added by this update.

Summary of changes:
 docs/sql-pyspark-pandas-with-arrow.md | 7 +++
 1 file changed, 3 insertions(+), 4 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-27240][PYTHON] Use pandas DataFrame for struct type argument in Scalar Pandas UDF.

2019-03-25 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 594be7a  [SPARK-27240][PYTHON] Use pandas DataFrame for struct type 
argument in Scalar Pandas UDF.
594be7a is described below

commit 594be7a911584a05f875f217ac39af9a5eeff049
Author: Takuya UESHIN 
AuthorDate: Mon Mar 25 11:26:09 2019 -0700

[SPARK-27240][PYTHON] Use pandas DataFrame for struct type argument in 
Scalar Pandas UDF.

## What changes were proposed in this pull request?

Now that we support returning pandas DataFrame for struct type in Scalar 
Pandas UDF.

If we chain another Pandas UDF after the Scalar Pandas UDF returning pandas 
DataFrame, the argument of the chained UDF will be pandas DataFrame, but 
currently we don't support pandas DataFrame as an argument of Scalar Pandas 
UDF. That means there is an inconsistency between the chained UDF and the 
single UDF.

We should support taking pandas DataFrame for struct type argument in 
Scalar Pandas UDF to be consistent.
Currently pyarrow >=0.11 is supported.

## How was this patch tested?

Modified and added some tests.

Closes #24177 from ueshin/issues/SPARK-27240/structtype_argument.

Authored-by: Takuya UESHIN 
Signed-off-by: Bryan Cutler 
---
 python/pyspark/serializers.py  | 29 +++
 python/pyspark/sql/tests/test_pandas_udf_scalar.py | 33 ++
 python/pyspark/sql/types.py| 10 +++
 python/pyspark/worker.py   |  6 +++-
 4 files changed, 72 insertions(+), 6 deletions(-)

diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 58f7552..ed419db 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -260,11 +260,10 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
 self._safecheck = safecheck
 self._assign_cols_by_name = assign_cols_by_name
 
-def arrow_to_pandas(self, arrow_column):
-from pyspark.sql.types import from_arrow_type, \
-_arrow_column_to_pandas, _check_series_localize_timestamps
+def arrow_to_pandas(self, arrow_column, data_type):
+from pyspark.sql.types import _arrow_column_to_pandas, 
_check_series_localize_timestamps
 
-s = _arrow_column_to_pandas(arrow_column, 
from_arrow_type(arrow_column.type))
+s = _arrow_column_to_pandas(arrow_column, data_type)
 s = _check_series_localize_timestamps(s, self._timezone)
 return s
 
@@ -366,8 +365,10 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
 """
 batches = super(ArrowStreamPandasSerializer, self).load_stream(stream)
 import pyarrow as pa
+from pyspark.sql.types import from_arrow_type
 for batch in batches:
-yield [self.arrow_to_pandas(c) for c in 
pa.Table.from_batches([batch]).itercolumns()]
+yield [self.arrow_to_pandas(c, from_arrow_type(c.type))
+   for c in pa.Table.from_batches([batch]).itercolumns()]
 
 def __repr__(self):
 return "ArrowStreamPandasSerializer"
@@ -378,6 +379,24 @@ class 
ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
 Serializer used by Python worker to evaluate Pandas UDFs
 """
 
+def __init__(self, timezone, safecheck, assign_cols_by_name, 
df_for_struct=False):
+super(ArrowStreamPandasUDFSerializer, self) \
+.__init__(timezone, safecheck, assign_cols_by_name)
+self._df_for_struct = df_for_struct
+
+def arrow_to_pandas(self, arrow_column, data_type):
+from pyspark.sql.types import StructType, \
+_arrow_column_to_pandas, _check_dataframe_localize_timestamps
+
+if self._df_for_struct and type(data_type) == StructType:
+import pandas as pd
+series = [_arrow_column_to_pandas(column, 
field.dataType).rename(field.name)
+  for column, field in zip(arrow_column.flatten(), 
data_type)]
+s = _check_dataframe_localize_timestamps(pd.concat(series, 
axis=1), self._timezone)
+else:
+s = super(ArrowStreamPandasUDFSerializer, 
self).arrow_to_pandas(arrow_column, data_type)
+return s
+
 def dump_stream(self, iterator, stream):
 """
 Override because Pandas UDFs require a START_ARROW_STREAM before the 
Arrow stream is sent.
diff --git a/python/pyspark/sql/tests/test_pandas_udf_scalar.py 
b/python/pyspark/sql/tests/test_pandas_udf_scalar.py
index 28b6db2..7df918b 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_scalar.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_scalar.py
@@ -270,6 +270,7 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
 
 def tes

[spark] branch master updated: [SPARK-23836][PYTHON] Add support for StructType return in Scalar Pandas UDF

2019-03-07 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ddc2052  [SPARK-23836][PYTHON] Add support for StructType return in 
Scalar Pandas UDF
ddc2052 is described below

commit ddc2052ebd247aa2a8dad34fd5c1cd345fa45118
Author: Bryan Cutler 
AuthorDate: Thu Mar 7 08:52:24 2019 -0800

[SPARK-23836][PYTHON] Add support for StructType return in Scalar Pandas UDF

## What changes were proposed in this pull request?

This change adds support for returning StructType from a scalar Pandas UDF, 
where the return value of the function is a pandas.DataFrame. Nested structs 
are not supported and an error will be raised, child types can be any other 
type currently supported.

## How was this patch tested?

Added additional unit tests to `test_pandas_udf_scalar`

Closes #23900 from 
BryanCutler/pyspark-support-scalar_udf-StructType-SPARK-23836.

Authored-by: Bryan Cutler 
Signed-off-by: Bryan Cutler 
---
 python/pyspark/serializers.py  | 39 +--
 python/pyspark/sql/functions.py| 12 +++-
 python/pyspark/sql/session.py  |  3 +-
 .../sql/tests/test_pandas_udf_grouped_map.py   |  1 +
 python/pyspark/sql/tests/test_pandas_udf_scalar.py | 81 +-
 python/pyspark/sql/types.py|  8 ++-
 python/pyspark/sql/udf.py  |  5 +-
 python/pyspark/worker.py   | 12 +++-
 8 files changed, 149 insertions(+), 12 deletions(-)

diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index a2c59fe..0c3c68e 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -64,6 +64,7 @@ if sys.version < '3':
 from itertools import izip as zip, imap as map
 else:
 import pickle
+basestring = unicode = str
 xrange = range
 pickle_protocol = pickle.HIGHEST_PROTOCOL
 
@@ -244,7 +245,7 @@ class ArrowStreamSerializer(Serializer):
 return "ArrowStreamSerializer"
 
 
-def _create_batch(series, timezone, safecheck):
+def _create_batch(series, timezone, safecheck, assign_cols_by_name):
 """
 Create an Arrow record batch from the given pandas.Series or list of 
Series, with optional type.
 
@@ -254,6 +255,7 @@ def _create_batch(series, timezone, safecheck):
 """
 import decimal
 from distutils.version import LooseVersion
+import pandas as pd
 import pyarrow as pa
 from pyspark.sql.types import _check_series_convert_timestamps_internal
 # Make input conform to [(series1, type1), (series2, type2), ...]
@@ -295,7 +297,34 @@ def _create_batch(series, timezone, safecheck):
 raise RuntimeError(error_msg % (s.dtype, t), e)
 return array
 
-arrs = [create_array(s, t) for s, t in series]
+arrs = []
+for s, t in series:
+if t is not None and pa.types.is_struct(t):
+if not isinstance(s, pd.DataFrame):
+raise ValueError("A field of type StructType expects a 
pandas.DataFrame, "
+ "but got: %s" % str(type(s)))
+
+# Input partition and result pandas.DataFrame empty, make empty 
Arrays with struct
+if len(s) == 0 and len(s.columns) == 0:
+arrs_names = [(pa.array([], type=field.type), field.name) for 
field in t]
+# Assign result columns by schema name if user labeled with strings
+elif assign_cols_by_name and any(isinstance(name, basestring) for 
name in s.columns):
+arrs_names = [(create_array(s[field.name], field.type), 
field.name) for field in t]
+# Assign result columns by  position
+else:
+arrs_names = [(create_array(s[s.columns[i]], field.type), 
field.name)
+  for i, field in enumerate(t)]
+
+struct_arrs, struct_names = zip(*arrs_names)
+
+# TODO: from_arrays args switched for v0.9.0, remove when bump 
minimum pyarrow version
+if LooseVersion(pa.__version__) < LooseVersion("0.9.0"):
+arrs.append(pa.StructArray.from_arrays(struct_names, 
struct_arrs))
+else:
+arrs.append(pa.StructArray.from_arrays(struct_arrs, 
struct_names))
+else:
+arrs.append(create_array(s, t))
+
 return pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in 
xrange(len(arrs))])
 
 
@@ -304,10 +333,11 @@ class ArrowStreamPandasSerializer(Serializer):
 Serializes Pandas.Series as Arrow data with Arrow streaming format.
 """
 
-def __init__(self, timezone, safecheck):
+def __init__(self, timezone, safecheck, assign_cols_by_name):

[spark] branch master updated: [SPARK-26676][PYTHON] Make HiveContextSQLTests.test_unbounded_frames test compatible with Python 2 and PyPy

2019-01-21 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 75d8449  [SPARK-26676][PYTHON] Make 
HiveContextSQLTests.test_unbounded_frames test compatible with Python 2 and PyPy
75d8449 is described below

commit 75d84498a4e649dba0b8c9ec35965066246d1bd2
Author: Hyukjin Kwon 
AuthorDate: Mon Jan 21 14:27:17 2019 -0800

[SPARK-26676][PYTHON] Make HiveContextSQLTests.test_unbounded_frames test 
compatible with Python 2 and PyPy

## What changes were proposed in this pull request?

This particular test is being skipped at PyPy and Python 2.

```
Skipped tests in pyspark.sql.tests.test_context with pypy:
test_unbounded_frames 
(pyspark.sql.tests.test_context.HiveContextSQLTests) ... skipped "Unittest < 
3.3 doesn't support mocking"

Skipped tests in pyspark.sql.tests.test_context with python2.7:
test_unbounded_frames 
(pyspark.sql.tests.test_context.HiveContextSQLTests) ... skipped "Unittest < 
3.3 doesn't support mocking"
```

We don't have to use unittest 3.3 module to mock. And looks the test itself 
isn't compatible with Python 2.

This PR makes:
 - Manually monkey-patch `sys.maxsize` to get rid of unittest 3.3 condition
 - Use the built-in `reload` in Python 2, and `importlib.reload` in Python 3

## How was this patch tested?

Manually tested, and unit test is fixed.

Closes #23604 from HyukjinKwon/test-window.

Authored-by: Hyukjin Kwon 
Signed-off-by: Bryan Cutler 
---
 python/pyspark/sql/tests/test_context.py | 36 +++-
 1 file changed, 17 insertions(+), 19 deletions(-)

diff --git a/python/pyspark/sql/tests/test_context.py 
b/python/pyspark/sql/tests/test_context.py
index 918f4ad..dc81426 100644
--- a/python/pyspark/sql/tests/test_context.py
+++ b/python/pyspark/sql/tests/test_context.py
@@ -19,6 +19,11 @@ import shutil
 import sys
 import tempfile
 import unittest
+try:
+from importlib import reload  # Python 3.4+ only.
+except ImportError:
+# Otherwise, we will stick to Python 2's built-in reload.
+pass
 
 import py4j
 
@@ -216,12 +221,9 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
 parse_result = 
df.select(functions.to_date(functions.col("dateCol"))).first()
 self.assertEquals(date(2017, 1, 22), 
parse_result['to_date(`dateCol`)'])
 
-@unittest.skipIf(sys.version_info < (3, 3), "Unittest < 3.3 doesn't 
support mocking")
 def test_unbounded_frames(self):
-from unittest.mock import patch
 from pyspark.sql import functions as F
 from pyspark.sql import window
-import importlib
 
 df = self.spark.range(0, 3)
 
@@ -235,22 +237,18 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
 F.count("*").over(window.Window.rangeBetween(-sys.maxsize, 
sys.maxsize))
 ).columns[0]
 
-with patch("sys.maxsize", 2 ** 31 - 1):
-importlib.reload(window)
-self.assertTrue(rows_frame_match())
-self.assertTrue(range_frame_match())
-
-with patch("sys.maxsize", 2 ** 63 - 1):
-importlib.reload(window)
-self.assertTrue(rows_frame_match())
-self.assertTrue(range_frame_match())
-
-with patch("sys.maxsize", 2 ** 127 - 1):
-importlib.reload(window)
-self.assertTrue(rows_frame_match())
-self.assertTrue(range_frame_match())
-
-importlib.reload(window)
+for new_maxsize in [2 ** 31 - 1, 2 ** 63 - 1, 2 ** 127 - 1]:
+old_maxsize = sys.maxsize
+sys.maxsize = new_maxsize
+try:
+# Manually reload window module to use monkey-patched 
sys.maxsize.
+reload(window)
+self.assertTrue(rows_frame_match())
+self.assertTrue(range_frame_match())
+finally:
+sys.maxsize = old_maxsize
+
+reload(window)
 
 
 if __name__ == "__main__":


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-26349][PYSPARK] Forbid insecure py4j gateways

2019-01-08 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 32515d2  [SPARK-26349][PYSPARK] Forbid insecure py4j gateways
32515d2 is described below

commit 32515d205a4de4d8838226fa5e5c4e4f66935193
Author: Imran Rashid 
AuthorDate: Tue Jan 8 11:26:36 2019 -0800

[SPARK-26349][PYSPARK] Forbid insecure py4j gateways

Spark always creates secure py4j connections between java and python,
but it also allows users to pass in their own connection. This ensures
that even passed in connections are secure.

Added test cases verifying the failure with a (mocked) insecure gateway.

This is closely related to SPARK-26019, but this entirely forbids the
insecure connection, rather than creating the "escape-hatch".

Closes #23441 from squito/SPARK-26349.

Authored-by: Imran Rashid 
Signed-off-by: Bryan Cutler 
---
 python/pyspark/context.py|  5 +
 python/pyspark/tests/test_context.py | 10 ++
 2 files changed, 15 insertions(+)

diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 6137ed2..64178eb 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -115,6 +115,11 @@ class SparkContext(object):
 ValueError:...
 """
 self._callsite = first_spark_call() or CallSite(None, None, None)
+if gateway is not None and gateway.gateway_parameters.auth_token is 
None:
+raise ValueError(
+"You are trying to pass an insecure Py4j gateway to Spark. 
This"
+" is not allowed as it is a security risk.")
+
 SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
 try:
 self._do_init(master, appName, sparkHome, pyFiles, environment, 
batchSize, serializer,
diff --git a/python/pyspark/tests/test_context.py 
b/python/pyspark/tests/test_context.py
index 201baf4..18d9cd4 100644
--- a/python/pyspark/tests/test_context.py
+++ b/python/pyspark/tests/test_context.py
@@ -20,6 +20,7 @@ import tempfile
 import threading
 import time
 import unittest
+from collections import namedtuple
 
 from pyspark import SparkFiles, SparkContext
 from pyspark.testing.utils import ReusedPySparkTestCase, PySparkTestCase, 
QuietTest, SPARK_HOME
@@ -246,6 +247,15 @@ class ContextTests(unittest.TestCase):
 with SparkContext() as sc:
 self.assertGreater(sc.startTime, 0)
 
+def test_forbid_insecure_gateway(self):
+# Fail immediately if you try to create a SparkContext
+# with an insecure gateway
+parameters = namedtuple('MockGatewayParameters', 'auth_token')(None)
+mock_insecure_gateway = namedtuple('MockJavaGateway', 
'gateway_parameters')(parameters)
+with self.assertRaises(ValueError) as context:
+SparkContext(gateway=mock_insecure_gateway)
+self.assertIn("insecure Py4j gateway", str(context.exception))
+
 
 if __name__ == "__main__":
 from pyspark.tests.test_context import *


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24333][ML][PYTHON] Add fit with validation set to spark.ml GBT: Python API

2018-12-07 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/master 3b8ae2373 -> 20278e719


[SPARK-24333][ML][PYTHON] Add fit with validation set to spark.ml GBT: Python 
API

## What changes were proposed in this pull request?

Add validationIndicatorCol and validationTol to GBT Python.

## How was this patch tested?

Add test in doctest to test the new API.

Closes #21465 from huaxingao/spark-24333.

Authored-by: Huaxin Gao 
Signed-off-by: Bryan Cutler 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/20278e71
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/20278e71
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/20278e71

Branch: refs/heads/master
Commit: 20278e719e28fc5d7a8069e0498a8df143ecee90
Parents: 3b8ae23
Author: Huaxin Gao 
Authored: Fri Dec 7 13:53:35 2018 -0800
Committer: Bryan Cutler 
Committed: Fri Dec 7 13:53:35 2018 -0800

--
 python/pyspark/ml/classification.py |  81 ---
 .../pyspark/ml/param/_shared_params_code_gen.py |   5 +-
 python/pyspark/ml/param/shared.py   |  71 -
 python/pyspark/ml/regression.py | 100 +--
 4 files changed, 169 insertions(+), 88 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/20278e71/python/pyspark/ml/classification.py
--
diff --git a/python/pyspark/ml/classification.py 
b/python/pyspark/ml/classification.py
index ce02851..6ddfce9 100644
--- a/python/pyspark/ml/classification.py
+++ b/python/pyspark/ml/classification.py
@@ -23,7 +23,7 @@ from pyspark import since, keyword_only
 from pyspark.ml import Estimator, Model
 from pyspark.ml.param.shared import *
 from pyspark.ml.regression import DecisionTreeModel, 
DecisionTreeRegressionModel, \
-RandomForestParams, TreeEnsembleModel, TreeEnsembleParams
+GBTParams, HasVarianceImpurity, RandomForestParams, TreeEnsembleModel, 
TreeEnsembleParams
 from pyspark.ml.util import *
 from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams
 from pyspark.ml.wrapper import JavaWrapper
@@ -895,15 +895,6 @@ class TreeClassifierParams(object):
 return self.getOrDefault(self.impurity)
 
 
-class GBTParams(TreeEnsembleParams):
-"""
-Private class to track supported GBT params.
-
-.. versionadded:: 1.4.0
-"""
-supportedLossTypes = ["logistic"]
-
-
 @inherit_doc
 class DecisionTreeClassifier(JavaEstimator, HasFeaturesCol, HasLabelCol, 
HasPredictionCol,
  HasProbabilityCol, HasRawPredictionCol, 
DecisionTreeParams,
@@ -1174,9 +1165,31 @@ class RandomForestClassificationModel(TreeEnsembleModel, 
JavaClassificationModel
 return [DecisionTreeClassificationModel(m) for m in 
list(self._call_java("trees"))]
 
 
+class GBTClassifierParams(GBTParams, HasVarianceImpurity):
+"""
+Private class to track supported GBTClassifier params.
+
+.. versionadded:: 3.0.0
+"""
+
+supportedLossTypes = ["logistic"]
+
+lossType = Param(Params._dummy(), "lossType",
+ "Loss function which GBT tries to minimize 
(case-insensitive). " +
+ "Supported options: " + ", ".join(supportedLossTypes),
+ typeConverter=TypeConverters.toString)
+
+@since("1.4.0")
+def getLossType(self):
+"""
+Gets the value of lossType or its default value.
+"""
+return self.getOrDefault(self.lossType)
+
+
 @inherit_doc
-class GBTClassifier(JavaEstimator, HasFeaturesCol, HasLabelCol, 
HasPredictionCol, HasMaxIter,
-GBTParams, HasCheckpointInterval, HasStepSize, HasSeed, 
JavaMLWritable,
+class GBTClassifier(JavaEstimator, HasFeaturesCol, HasLabelCol, 
HasPredictionCol,
+GBTClassifierParams, HasCheckpointInterval, HasSeed, 
JavaMLWritable,
 JavaMLReadable):
 """
 `Gradient-Boosted Trees (GBTs) 
`_
@@ -1242,32 +1255,28 @@ class GBTClassifier(JavaEstimator, HasFeaturesCol, 
HasLabelCol, HasPredictionCol
 [0.25..., 0.23..., 0.21..., 0.19..., 0.18...]
 >>> model.numClasses
 2
+>>> gbt = gbt.setValidationIndicatorCol("validationIndicator")
+>>> gbt.getValidationIndicatorCol()
+'validationIndicator'
+>>> gbt.getValidationTol()
+0.01
 
 .. versionadded:: 1.4.0
 """
 
-lossType = Param(Params._dummy(), "lossType",
- "Loss function which GBT tries to minimize 
(case-insensitive). " +
- "Supported options: " + ", 
".join(GBTParams.supportedLossTypes),
- typeConverter=TypeConverters.toString)
-
-stepSize = Param(Params._dummy(), "stepSize",
- "Step size (a.k.a. learning rate) in interval 

spark git commit: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow send un-ordered record batches to improve performance

2018-12-06 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/master ab76900fe -> ecaa495b1


[SPARK-25274][PYTHON][SQL] In toPandas with Arrow send un-ordered record 
batches to improve performance

## What changes were proposed in this pull request?

When executing `toPandas` with Arrow enabled, partitions that arrive in the JVM 
out-of-order must be buffered before they can be send to Python. This causes an 
excess of memory to be used in the driver JVM and increases the time it takes 
to complete because data must sit in the JVM waiting for preceding partitions 
to come in.

This change sends un-ordered partitions to Python as soon as they arrive in the 
JVM, followed by a list of partition indices so that Python can assemble the 
data in the correct order. This way, data is not buffered at the JVM and there 
is no waiting on particular partitions so performance will be increased.

Followup to #21546

## How was this patch tested?

Added new test with a large number of batches per partition, and test that 
forces a small delay in the first partition. These test that partitions are 
collected out-of-order and then are are put in the correct order in Python.

## Performance Tests - toPandas

Tests run on a 4 node standalone cluster with 32 cores total, 14.04.1-Ubuntu 
and OpenJDK 8
measured wall clock time to execute `toPandas()` and took the average best time 
of 5 runs/5 loops each.

Test code
```python
df = spark.range(1 << 25, numPartitions=32).toDF("id").withColumn("x1", 
rand()).withColumn("x2", rand()).withColumn("x3", rand()).withColumn("x4", 
rand())
for i in range(5):
start = time.time()
_ = df.toPandas()
elapsed = time.time() - start
```

Spark config
```
spark.driver.memory 5g
spark.executor.memory 5g
spark.driver.maxResultSize 2g
spark.sql.execution.arrow.enabled true
```

Current Master w/ Arrow stream | This PR
-|
5.16207 | 4.342533
5.133671 | 4.399408
5.147513 | 4.468471
5.105243 | 4.36524
5.018685 | 4.373791

Avg Master | Avg This PR
--|--
5.1134364 | 4.3898886

Speedup of **1.164821449**

Closes #22275 from BryanCutler/arrow-toPandas-oo-batches-SPARK-25274.

Authored-by: Bryan Cutler 
Signed-off-by: Bryan Cutler 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ecaa495b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ecaa495b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ecaa495b

Branch: refs/heads/master
Commit: ecaa495b1fe532c36e952ccac42f4715809476af
Parents: ab76900
Author: Bryan Cutler 
Authored: Thu Dec 6 10:07:28 2018 -0800
Committer: Bryan Cutler 
Committed: Thu Dec 6 10:07:28 2018 -0800

--
 python/pyspark/serializers.py   | 33 ++
 python/pyspark/sql/dataframe.py | 11 -
 python/pyspark/sql/tests/test_arrow.py  | 28 
 .../scala/org/apache/spark/sql/Dataset.scala| 45 +++-
 4 files changed, 95 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ecaa495b/python/pyspark/serializers.py
--
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index ff9a612..f3ebd37 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -185,6 +185,39 @@ class FramedSerializer(Serializer):
 raise NotImplementedError
 
 
+class ArrowCollectSerializer(Serializer):
+"""
+Deserialize a stream of batches followed by batch order information. Used 
in
+DataFrame._collectAsArrow() after invoking 
Dataset.collectAsArrowToPython() in the JVM.
+"""
+
+def __init__(self):
+self.serializer = ArrowStreamSerializer()
+
+def dump_stream(self, iterator, stream):
+return self.serializer.dump_stream(iterator, stream)
+
+def load_stream(self, stream):
+"""
+Load a stream of un-ordered Arrow RecordBatches, where the last 
iteration yields
+a list of indices that can be used to put the RecordBatches in the 
correct order.
+"""
+# load the batches
+for batch in self.serializer.load_stream(stream):
+yield batch
+
+# load the batch order indices
+num = read_int(stream)
+batch_order = []
+for i in xrange(num):
+index = read_int(stream)
+batch_order.append(index)
+yield batch_order
+
+def __repr__(self):
+return "ArrowCollectSerializer(%s)" % self.serializer
+
+
 class ArrowStreamSerializer(Serializer):
 """
 Serializes Arrow record batches as a stream.

http://git-wip-us.apache.org/repos/asf/spark/blob/ecaa495b/python/pyspark/sql/dataframe.py

spark git commit: [SPARK-26033][SPARK-26034][PYTHON][FOLLOW-UP] Small cleanup and deduplication in ml/mllib tests

2018-12-03 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/master 187bb7d00 -> 518a3d10c


[SPARK-26033][SPARK-26034][PYTHON][FOLLOW-UP] Small cleanup and deduplication 
in ml/mllib tests

## What changes were proposed in this pull request?

This PR is a small follow up that puts some logic and functions into smaller 
scope and make it localized, and deduplicate.

## How was this patch tested?

Manually tested. Jenkins tests as well.

Closes #23200 from HyukjinKwon/followup-SPARK-26034-SPARK-26033.

Authored-by: Hyukjin Kwon 
Signed-off-by: Bryan Cutler 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/518a3d10
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/518a3d10
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/518a3d10

Branch: refs/heads/master
Commit: 518a3d10c87bb6d7d442eba7265fc026aa54473e
Parents: 187bb7d
Author: Hyukjin Kwon 
Authored: Mon Dec 3 14:03:10 2018 -0800
Committer: Bryan Cutler 
Committed: Mon Dec 3 14:03:10 2018 -0800

--
 python/pyspark/ml/tests/test_linalg.py| 44 ---
 python/pyspark/mllib/tests/test_algorithms.py |  8 +--
 python/pyspark/mllib/tests/test_linalg.py | 62 +-
 python/pyspark/testing/mllibutils.py  |  5 --
 4 files changed, 51 insertions(+), 68 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/518a3d10/python/pyspark/ml/tests/test_linalg.py
--
diff --git a/python/pyspark/ml/tests/test_linalg.py 
b/python/pyspark/ml/tests/test_linalg.py
index 71cad5d..995bc35 100644
--- a/python/pyspark/ml/tests/test_linalg.py
+++ b/python/pyspark/ml/tests/test_linalg.py
@@ -20,25 +20,17 @@ import array as pyarray
 
 from numpy import arange, array, array_equal, inf, ones, tile, zeros
 
+from pyspark.serializers import PickleSerializer
 from pyspark.ml.linalg import DenseMatrix, DenseVector, MatrixUDT, 
SparseMatrix, SparseVector, \
 Vector, VectorUDT, Vectors
-from pyspark.testing.mllibutils import make_serializer, MLlibTestCase
+from pyspark.testing.mllibutils import MLlibTestCase
 from pyspark.sql import Row
 
 
-ser = make_serializer()
-
-
-def _squared_distance(a, b):
-if isinstance(a, Vector):
-return a.squared_distance(b)
-else:
-return b.squared_distance(a)
-
-
 class VectorTests(MLlibTestCase):
 
 def _test_serialize(self, v):
+ser = PickleSerializer()
 self.assertEqual(v, ser.loads(ser.dumps(v)))
 jvec = 
self.sc._jvm.org.apache.spark.ml.python.MLSerDe.loads(bytearray(ser.dumps(v)))
 nv = 
ser.loads(bytes(self.sc._jvm.org.apache.spark.ml.python.MLSerDe.dumps(jvec)))
@@ -77,24 +69,30 @@ class VectorTests(MLlibTestCase):
 self.assertEqual(7.0, sv.dot(arr))
 
 def test_squared_distance(self):
+def squared_distance(a, b):
+if isinstance(a, Vector):
+return a.squared_distance(b)
+else:
+return b.squared_distance(a)
+
 sv = SparseVector(4, {1: 1, 3: 2})
 dv = DenseVector(array([1., 2., 3., 4.]))
 lst = DenseVector([4, 3, 2, 1])
 lst1 = [4, 3, 2, 1]
 arr = pyarray.array('d', [0, 2, 1, 3])
 narr = array([0, 2, 1, 3])
-self.assertEqual(15.0, _squared_distance(sv, dv))
-self.assertEqual(25.0, _squared_distance(sv, lst))
-self.assertEqual(20.0, _squared_distance(dv, lst))
-self.assertEqual(15.0, _squared_distance(dv, sv))
-self.assertEqual(25.0, _squared_distance(lst, sv))
-self.assertEqual(20.0, _squared_distance(lst, dv))
-self.assertEqual(0.0, _squared_distance(sv, sv))
-self.assertEqual(0.0, _squared_distance(dv, dv))
-self.assertEqual(0.0, _squared_distance(lst, lst))
-self.assertEqual(25.0, _squared_distance(sv, lst1))
-self.assertEqual(3.0, _squared_distance(sv, arr))
-self.assertEqual(3.0, _squared_distance(sv, narr))
+self.assertEqual(15.0, squared_distance(sv, dv))
+self.assertEqual(25.0, squared_distance(sv, lst))
+self.assertEqual(20.0, squared_distance(dv, lst))
+self.assertEqual(15.0, squared_distance(dv, sv))
+self.assertEqual(25.0, squared_distance(lst, sv))
+self.assertEqual(20.0, squared_distance(lst, dv))
+self.assertEqual(0.0, squared_distance(sv, sv))
+self.assertEqual(0.0, squared_distance(dv, dv))
+self.assertEqual(0.0, squared_distance(lst, lst))
+self.assertEqual(25.0, squared_distance(sv, lst1))
+self.assertEqual(3.0, squared_distance(sv, arr))
+self.assertEqual(3.0, squared_distance(sv, narr))
 
 def test_hash(self):
 v1 = DenseVector([0.0, 1.0, 0.0, 5.5])


spark git commit: [SPARK-25798][PYTHON] Internally document type conversion between Pandas data and SQL types in Pandas UDFs

2018-10-24 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/master b19a28dea -> 7251be0c0


[SPARK-25798][PYTHON] Internally document type conversion between Pandas data 
and SQL types in Pandas UDFs

## What changes were proposed in this pull request?

We are facing some problems about type conversions between Pandas data and SQL 
types in Pandas UDFs.
It's even difficult to identify the problems (see #20163 and #22610).

This PR targets to internally document the type conversion table. Some of them 
looks buggy and we should fix them.

Table can be generated via the codes below:

```python
from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf

columns = [
('none', 'object(NoneType)'),
('bool', 'bool'),
('int8', 'int8'),
('int16', 'int16'),
('int32', 'int32'),
('int64', 'int64'),
('uint8', 'uint8'),
('uint16', 'uint16'),
('uint32', 'uint32'),
('uint64', 'uint64'),
('float64', 'float16'),
('float64', 'float32'),
('float64', 'float64'),
('date', 'datetime64[ns]'),
('tz_aware_dates', 'datetime64[ns, US/Eastern]'),
('string', 'object(string)'),
('decimal', 'object(Decimal)'),
('array', 'object(array[int32])'),
('float128', 'float128'),
('complex64', 'complex64'),
('complex128', 'complex128'),
('category', 'category'),
('tdeltas', 'timedelta64[ns]'),
]

def create_dataframe():
import pandas as pd
import numpy as np
import decimal
pdf = pd.DataFrame({
'none': [None, None],
'bool': [True, False],
'int8': np.arange(1, 3).astype('int8'),
'int16': np.arange(1, 3).astype('int16'),
'int32': np.arange(1, 3).astype('int32'),
'int64': np.arange(1, 3).astype('int64'),
'uint8': np.arange(1, 3).astype('uint8'),
'uint16': np.arange(1, 3).astype('uint16'),
'uint32': np.arange(1, 3).astype('uint32'),
'uint64': np.arange(1, 3).astype('uint64'),
'float16': np.arange(1, 3).astype('float16'),
'float32': np.arange(1, 3).astype('float32'),
'float64': np.arange(1, 3).astype('float64'),
'float128': np.arange(1, 3).astype('float128'),
'complex64': np.arange(1, 3).astype('complex64'),
'complex128': np.arange(1, 3).astype('complex128'),
'string': list('ab'),
'array': pd.Series([np.array([1, 2, 3], dtype=np.int32), np.array([1, 
2, 3], dtype=np.int32)]),
'decimal': pd.Series([decimal.Decimal('1'), decimal.Decimal('2')]),
'date': pd.date_range('19700101', periods=2).values,
'category': pd.Series(list("AB")).astype('category')})
pdf['tdeltas'] = [pdf.date.diff()[1], pdf.date.diff()[0]]
pdf['tz_aware_dates'] = pd.date_range('19700101', periods=2, 
tz='US/Eastern')
return pdf

types =  [
BooleanType(),
ByteType(),
ShortType(),
IntegerType(),
LongType(),
FloatType(),
DoubleType(),
DateType(),
TimestampType(),
StringType(),
DecimalType(10, 0),
ArrayType(IntegerType()),
MapType(StringType(), IntegerType()),
StructType([StructField("_1", IntegerType())]),
BinaryType(),
]

df = spark.range(2).repartition(1)
results = []
count = 0
total = len(types) * len(columns)
values = []
spark.sparkContext.setLogLevel("FATAL")
for t in types:
result = []
for column, pandas_t in columns:
v = create_dataframe()[column][0]
values.append(v)
try:
row = df.select(pandas_udf(lambda _: create_dataframe()[column], 
t)(df.id)).first()
ret_str = repr(row[0])
except Exception:
ret_str = "X"
result.append(ret_str)
progress = "SQL Type: [%s]\n  Pandas Value(Type): %s(%s)]\n  Result 
Python Value: [%s]" % (
t.simpleString(), v, pandas_t, ret_str)
count += 1
print("%s/%s:\n  %s" % (count, total, progress))
results.append([t.simpleString()] + list(map(str, result)))

schema = ["SQL Type \\ Pandas Value(Type)"] + list(map(lambda values_column: 
"%s(%s)" % (values_column[0], values_column[1][1]), zip(values, columns)))
strings = spark.createDataFrame(results, schema=schema)._jdf.showString(20, 20, 
False)
print("\n".join(map(lambda line: "# %s  # noqa" % line, 
strings.strip().split("\n"

```

This code is compatible with both Python 2 and 3 but the table was generated 
under Python 2.

## How was this patch tested?

Manually tested and lint check.

Closes #22795 from HyukjinKwon/SPARK-25798.

Authored-by: hyukjinkwon 
Signed-off-by: Bryan Cutler 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7251be0c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7251be0c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7251be0c

Branch: refs/heads/master
Commit: 7251be0c04f0380208e0197e559158a9e1400868
Parents: b19a28d
Author: hyukjinkwon 
Authored: Wed Oct 24 10:04:17 2018 -0700
Committer: Bryan Cutler 
Committed: Wed 

spark git commit: [SPARK-23672][PYTHON] Document support for nested return types in scalar with arrow udfs

2018-09-10 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/branch-2.4 5d98c3194 -> ffd036a6d


[SPARK-23672][PYTHON] Document support for nested return types in scalar with 
arrow udfs

## What changes were proposed in this pull request?

Clarify docstring for Scalar functions

## How was this patch tested?

Adds a unit test showing use similar to wordcount, there's existing unit test 
for array of floats as well.

Closes #20908 from 
holdenk/SPARK-23672-document-support-for-nested-return-types-in-scalar-with-arrow-udfs.

Authored-by: Holden Karau 
Signed-off-by: Bryan Cutler 
(cherry picked from commit da5685b5bb9ee7daaeb4e8f99c488ebd50c7aac3)
Signed-off-by: Bryan Cutler 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ffd036a6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ffd036a6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ffd036a6

Branch: refs/heads/branch-2.4
Commit: ffd036a6d13814ebcc332990be1e286939cc6abe
Parents: 5d98c31
Author: Holden Karau 
Authored: Mon Sep 10 11:01:51 2018 -0700
Committer: Bryan Cutler 
Committed: Mon Sep 10 11:02:09 2018 -0700

--
 python/pyspark/sql/functions.py |  3 ++-
 python/pyspark/sql/tests.py | 19 +++
 2 files changed, 21 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ffd036a6/python/pyspark/sql/functions.py
--
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 9396b16..81f35f5 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2720,9 +2720,10 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
 1. SCALAR
 
A scalar UDF defines a transformation: One or more `pandas.Series` -> A 
`pandas.Series`.
-   The returnType should be a primitive data type, e.g., 
:class:`DoubleType`.
The length of the returned `pandas.Series` must be of the same as the 
input `pandas.Series`.
 
+   :class:`MapType`, :class:`StructType` are currently not supported as 
output types.
+
Scalar UDFs are used with :meth:`pyspark.sql.DataFrame.withColumn` and
:meth:`pyspark.sql.DataFrame.select`.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ffd036a6/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 6d9d636..8e5bc67 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -4443,6 +4443,7 @@ class ArrowTests(ReusedSQLTestCase):
 not _have_pandas or not _have_pyarrow,
 _pandas_requirement_message or _pyarrow_requirement_message)
 class PandasUDFTests(ReusedSQLTestCase):
+
 def test_pandas_udf_basic(self):
 from pyspark.rdd import PythonEvalType
 from pyspark.sql.functions import pandas_udf, PandasUDFType
@@ -4658,6 +4659,24 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
 random_udf = random_udf.asNondeterministic()
 return random_udf
 
+def test_pandas_udf_tokenize(self):
+from pyspark.sql.functions import pandas_udf
+tokenize = pandas_udf(lambda s: s.apply(lambda str: str.split(' ')),
+  ArrayType(StringType()))
+self.assertEqual(tokenize.returnType, ArrayType(StringType()))
+df = self.spark.createDataFrame([("hi boo",), ("bye boo",)], ["vals"])
+result = df.select(tokenize("vals").alias("hi"))
+self.assertEqual([Row(hi=[u'hi', u'boo']), Row(hi=[u'bye', u'boo'])], 
result.collect())
+
+def test_pandas_udf_nested_arrays(self):
+from pyspark.sql.functions import pandas_udf
+tokenize = pandas_udf(lambda s: s.apply(lambda str: [str.split(' ')]),
+  ArrayType(ArrayType(StringType(
+self.assertEqual(tokenize.returnType, 
ArrayType(ArrayType(StringType(
+df = self.spark.createDataFrame([("hi boo",), ("bye boo",)], ["vals"])
+result = df.select(tokenize("vals").alias("hi"))
+self.assertEqual([Row(hi=[[u'hi', u'boo']]), Row(hi=[[u'bye', 
u'boo']])], result.collect())
+
 def test_vectorized_udf_basic(self):
 from pyspark.sql.functions import pandas_udf, col, array
 df = self.spark.range(10).select(


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25072][PYSPARK] Forbid extra value for custom Row

2018-09-06 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 9db81fd86 -> 31dab7140


[SPARK-25072][PYSPARK] Forbid extra value for custom Row

## What changes were proposed in this pull request?

Add value length check in `_create_row`, forbid extra value for custom Row in 
PySpark.

## How was this patch tested?

New UT in pyspark-sql

Closes #22140 from xuanyuanking/SPARK-25072.

Lead-authored-by: liyuanjian 
Co-authored-by: Yuanjian Li 
Signed-off-by: Bryan Cutler 
(cherry picked from commit c84bc40d7f33c71eca1c08f122cd60517f34c1f8)
Signed-off-by: Bryan Cutler 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/31dab714
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/31dab714
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/31dab714

Branch: refs/heads/branch-2.3
Commit: 31dab7140a4b271e7b976762af7a36f8bfbb8381
Parents: 9db81fd
Author: liyuanjian 
Authored: Thu Sep 6 10:17:29 2018 -0700
Committer: Bryan Cutler 
Committed: Thu Sep 6 10:18:04 2018 -0700

--
 python/pyspark/sql/tests.py | 4 
 python/pyspark/sql/types.py | 3 +++
 2 files changed, 7 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/31dab714/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 6bfb329..374db54 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -268,6 +268,10 @@ class DataTypeTests(unittest.TestCase):
 struct_field = StructField("a", IntegerType())
 self.assertRaises(TypeError, struct_field.typeName)
 
+def test_invalid_create_row(self):
+row_class = Row("c1", "c2")
+self.assertRaises(ValueError, lambda: row_class(1, 2, 3))
+
 
 class SQLTests(ReusedSQLTestCase):
 

http://git-wip-us.apache.org/repos/asf/spark/blob/31dab714/python/pyspark/sql/types.py
--
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index cd85740..8a8cd8d 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -1532,6 +1532,9 @@ class Row(tuple):
 # let object acts like class
 def __call__(self, *args):
 """create new Row object"""
+if len(args) > len(self):
+raise ValueError("Can not create Row with fields %s, expected %d 
values "
+ "but got %s" % (self, len(self), args))
 return _create_row(self, args)
 
 def __getitem__(self, item):


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25072][PYSPARK] Forbid extra value for custom Row

2018-09-06 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/branch-2.4 f2d502223 -> 3682d29f4


[SPARK-25072][PYSPARK] Forbid extra value for custom Row

## What changes were proposed in this pull request?

Add value length check in `_create_row`, forbid extra value for custom Row in 
PySpark.

## How was this patch tested?

New UT in pyspark-sql

Closes #22140 from xuanyuanking/SPARK-25072.

Lead-authored-by: liyuanjian 
Co-authored-by: Yuanjian Li 
Signed-off-by: Bryan Cutler 
(cherry picked from commit c84bc40d7f33c71eca1c08f122cd60517f34c1f8)
Signed-off-by: Bryan Cutler 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3682d29f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3682d29f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3682d29f

Branch: refs/heads/branch-2.4
Commit: 3682d29f45870031d9dc4e812accbfbb583cc52a
Parents: f2d5022
Author: liyuanjian 
Authored: Thu Sep 6 10:17:29 2018 -0700
Committer: Bryan Cutler 
Committed: Thu Sep 6 10:17:46 2018 -0700

--
 python/pyspark/sql/tests.py | 4 
 python/pyspark/sql/types.py | 3 +++
 2 files changed, 7 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3682d29f/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 81c0af0..6d9d636 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -277,6 +277,10 @@ class DataTypeTests(unittest.TestCase):
 struct_field = StructField("a", IntegerType())
 self.assertRaises(TypeError, struct_field.typeName)
 
+def test_invalid_create_row(self):
+row_class = Row("c1", "c2")
+self.assertRaises(ValueError, lambda: row_class(1, 2, 3))
+
 
 class SQLTests(ReusedSQLTestCase):
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3682d29f/python/pyspark/sql/types.py
--
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 0b61707..ce1d004 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -1500,6 +1500,9 @@ class Row(tuple):
 # let object acts like class
 def __call__(self, *args):
 """create new Row object"""
+if len(args) > len(self):
+raise ValueError("Can not create Row with fields %s, expected %d 
values "
+ "but got %s" % (self, len(self), args))
 return _create_row(self, args)
 
 def __getitem__(self, item):


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25072][PYSPARK] Forbid extra value for custom Row

2018-09-06 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/master 3b6591b0b -> c84bc40d7


[SPARK-25072][PYSPARK] Forbid extra value for custom Row

## What changes were proposed in this pull request?

Add value length check in `_create_row`, forbid extra value for custom Row in 
PySpark.

## How was this patch tested?

New UT in pyspark-sql

Closes #22140 from xuanyuanking/SPARK-25072.

Lead-authored-by: liyuanjian 
Co-authored-by: Yuanjian Li 
Signed-off-by: Bryan Cutler 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c84bc40d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c84bc40d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c84bc40d

Branch: refs/heads/master
Commit: c84bc40d7f33c71eca1c08f122cd60517f34c1f8
Parents: 3b6591b
Author: liyuanjian 
Authored: Thu Sep 6 10:17:29 2018 -0700
Committer: Bryan Cutler 
Committed: Thu Sep 6 10:17:29 2018 -0700

--
 python/pyspark/sql/tests.py | 4 
 python/pyspark/sql/types.py | 3 +++
 2 files changed, 7 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c84bc40d/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 81c0af0..6d9d636 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -277,6 +277,10 @@ class DataTypeTests(unittest.TestCase):
 struct_field = StructField("a", IntegerType())
 self.assertRaises(TypeError, struct_field.typeName)
 
+def test_invalid_create_row(self):
+row_class = Row("c1", "c2")
+self.assertRaises(ValueError, lambda: row_class(1, 2, 3))
+
 
 class SQLTests(ReusedSQLTestCase):
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c84bc40d/python/pyspark/sql/types.py
--
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 0b61707..ce1d004 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -1500,6 +1500,9 @@ class Row(tuple):
 # let object acts like class
 def __call__(self, *args):
 """create new Row object"""
+if len(args) > len(self):
+raise ValueError("Can not create Row with fields %s, expected %d 
values "
+ "but got %s" % (self, len(self), args))
 return _create_row(self, args)
 
 def __getitem__(self, item):


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25328][PYTHON] Add an example for having two columns as the grouping key in group aggregate pandas UDF

2018-09-06 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/branch-2.4 085f731ad -> f2d502223


[SPARK-25328][PYTHON] Add an example for having two columns as the grouping key 
in group aggregate pandas UDF

## What changes were proposed in this pull request?

This PR proposes to add another example for multiple grouping key in group 
aggregate pandas UDF since this feature could make users still confused.

## How was this patch tested?

Manually tested and documentation built.

Closes #22329 from HyukjinKwon/SPARK-25328.

Authored-by: hyukjinkwon 
Signed-off-by: Bryan Cutler 
(cherry picked from commit 7ef6d1daf858cc9a2c390074f92aaf56c219518a)
Signed-off-by: Bryan Cutler 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f2d50222
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f2d50222
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f2d50222

Branch: refs/heads/branch-2.4
Commit: f2d5022233b637eb50567f7945042b3a8c9c6b25
Parents: 085f731
Author: hyukjinkwon 
Authored: Thu Sep 6 08:18:49 2018 -0700
Committer: Bryan Cutler 
Committed: Thu Sep 6 09:59:19 2018 -0700

--
 python/pyspark/sql/functions.py | 24 
 1 file changed, 20 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f2d50222/python/pyspark/sql/functions.py
--
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 864780e..9396b16 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2783,14 +2783,14 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
+---+---+
 
Alternatively, the user can define a function that takes two arguments.
-   In this case, the grouping key will be passed as the first argument and 
the data will
-   be passed as the second argument. The grouping key will be passed as a 
tuple of numpy
+   In this case, the grouping key(s) will be passed as the first argument 
and the data will
+   be passed as the second argument. The grouping key(s) will be passed as 
a tuple of numpy
data types, e.g., `numpy.int32` and `numpy.float64`. The data will 
still be passed in
as a `pandas.DataFrame` containing all columns from the original Spark 
DataFrame.
-   This is useful when the user does not want to hardcode grouping key in 
the function.
+   This is useful when the user does not want to hardcode grouping key(s) 
in the function.
 
-   >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> import pandas as pd  # doctest: +SKIP
+   >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> df = spark.createDataFrame(
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
... ("id", "v"))  # doctest: +SKIP
@@ -2806,6 +2806,22 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
|  1|1.5|
|  2|6.0|
+---+---+
+   >>> @pandas_udf(
+   ..."id long, `ceil(v / 2)` long, v double",
+   ...PandasUDFType.GROUPED_MAP)  # doctest: +SKIP
+   >>> def sum_udf(key, pdf):
+   ... # key is a tuple of two numpy.int64s, which is the values
+   ... # of 'id' and 'ceil(df.v / 2)' for the current group
+   ... return pd.DataFrame([key + (pdf.v.sum(),)])
+   >>> df.groupby(df.id, ceil(df.v / 2)).apply(sum_udf).show()  # doctest: 
+SKIP
+   +---+---++
+   | id|ceil(v / 2)|   v|
+   +---+---++
+   |  2|  5|10.0|
+   |  1|  1| 3.0|
+   |  2|  3| 5.0|
+   |  2|  2| 3.0|
+   +---+---++
 
.. note:: If returning a new `pandas.DataFrame` constructed with a 
dictionary, it is
recommended to explicitly index the columns by name to ensure the 
positions are correct,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25328][PYTHON] Add an example for having two columns as the grouping key in group aggregate pandas UDF

2018-09-06 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/master f5817d8bb -> 7ef6d1daf


[SPARK-25328][PYTHON] Add an example for having two columns as the grouping key 
in group aggregate pandas UDF

## What changes were proposed in this pull request?

This PR proposes to add another example for multiple grouping key in group 
aggregate pandas UDF since this feature could make users still confused.

## How was this patch tested?

Manually tested and documentation built.

Closes #22329 from HyukjinKwon/SPARK-25328.

Authored-by: hyukjinkwon 
Signed-off-by: Bryan Cutler 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ef6d1da
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ef6d1da
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ef6d1da

Branch: refs/heads/master
Commit: 7ef6d1daf858cc9a2c390074f92aaf56c219518a
Parents: f5817d8
Author: hyukjinkwon 
Authored: Thu Sep 6 08:18:49 2018 -0700
Committer: Bryan Cutler 
Committed: Thu Sep 6 08:18:49 2018 -0700

--
 python/pyspark/sql/functions.py | 24 
 1 file changed, 20 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7ef6d1da/python/pyspark/sql/functions.py
--
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 864780e..9396b16 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2783,14 +2783,14 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
+---+---+
 
Alternatively, the user can define a function that takes two arguments.
-   In this case, the grouping key will be passed as the first argument and 
the data will
-   be passed as the second argument. The grouping key will be passed as a 
tuple of numpy
+   In this case, the grouping key(s) will be passed as the first argument 
and the data will
+   be passed as the second argument. The grouping key(s) will be passed as 
a tuple of numpy
data types, e.g., `numpy.int32` and `numpy.float64`. The data will 
still be passed in
as a `pandas.DataFrame` containing all columns from the original Spark 
DataFrame.
-   This is useful when the user does not want to hardcode grouping key in 
the function.
+   This is useful when the user does not want to hardcode grouping key(s) 
in the function.
 
-   >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> import pandas as pd  # doctest: +SKIP
+   >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> df = spark.createDataFrame(
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
... ("id", "v"))  # doctest: +SKIP
@@ -2806,6 +2806,22 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
|  1|1.5|
|  2|6.0|
+---+---+
+   >>> @pandas_udf(
+   ..."id long, `ceil(v / 2)` long, v double",
+   ...PandasUDFType.GROUPED_MAP)  # doctest: +SKIP
+   >>> def sum_udf(key, pdf):
+   ... # key is a tuple of two numpy.int64s, which is the values
+   ... # of 'id' and 'ceil(df.v / 2)' for the current group
+   ... return pd.DataFrame([key + (pdf.v.sum(),)])
+   >>> df.groupby(df.id, ceil(df.v / 2)).apply(sum_udf).show()  # doctest: 
+SKIP
+   +---+---++
+   | id|ceil(v / 2)|   v|
+   +---+---++
+   |  2|  5|10.0|
+   |  1|  1| 3.0|
+   |  2|  3| 5.0|
+   |  2|  2| 3.0|
+   +---+---++
 
.. note:: If returning a new `pandas.DataFrame` constructed with a 
dictionary, it is
recommended to explicitly index the columns by name to ensure the 
positions are correct,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25105][PYSPARK][SQL] Include PandasUDFType in the import all of pyspark.sql.functions

2018-08-22 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/master 71f38ac24 -> 2381953ab


[SPARK-25105][PYSPARK][SQL] Include PandasUDFType in the import all of 
pyspark.sql.functions

## What changes were proposed in this pull request?

Include PandasUDFType in the import all of pyspark.sql.functions

## How was this patch tested?

Run the test case from the pyspark shell from the jira 
[spark-25105](https://jira.apache.org/jira/browse/SPARK-25105?jql=project%20%3D%20SPARK%20AND%20component%20in%20(ML%2C%20PySpark%2C%20SQL%2C%20%22Structured%20Streaming%22))
I manually test on pyspark-shell:
before:
`
>>> from pyspark.sql.functions import *
>>> foo = pandas_udf(lambda x: x, 'v int', PandasUDFType.GROUPED_MAP)
Traceback (most recent call last):
  File "", line 1, in 
NameError: name 'PandasUDFType' is not defined
>>>
`
after:
`
>>> from pyspark.sql.functions import *
>>> foo = pandas_udf(lambda x: x, 'v int', PandasUDFType.GROUPED_MAP)
>>>
`
Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Closes #22100 from kevinyu98/spark-25105.

Authored-by: Kevin Yu 
Signed-off-by: Bryan Cutler 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2381953a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2381953a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2381953a

Branch: refs/heads/master
Commit: 2381953ab5d9e86d87a9ef118f28bc3f67d6d805
Parents: 71f38ac
Author: Kevin Yu 
Authored: Wed Aug 22 10:16:47 2018 -0700
Committer: Bryan Cutler 
Committed: Wed Aug 22 10:16:47 2018 -0700

--
 python/pyspark/sql/functions.py | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2381953a/python/pyspark/sql/functions.py
--
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index f583373..d58d8d1 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2931,6 +2931,7 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
 blacklist = ['map', 'since', 'ignore_unicode_prefix']
 __all__ = [k for k, v in globals().items()
if not k.startswith('_') and k[0].islower() and callable(v) and k 
not in blacklist]
+__all__ += ["PandasUDFType"]
 __all__.sort()
 
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23698][PYTHON] Resolve undefined names in Python 3

2018-08-22 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/master e75488718 -> 71f38ac24


[SPARK-23698][PYTHON] Resolve undefined names in Python 3

## What changes were proposed in this pull request?

Fix issues arising from the fact that builtins __file__, __long__, 
__raw_input()__, __unicode__, __xrange()__, etc. were all removed from Python 
3.  __Undefined names__ have the potential to raise 
[NameError](https://docs.python.org/3/library/exceptions.html#NameError) at 
runtime.

## How was this patch tested?
* $ __python2 -m flake8 . --count --select=E9,F82 --show-source --statistics__
* $ __python3 -m flake8 . --count --select=E9,F82 --show-source --statistics__

holdenk

flake8 testing of https://github.com/apache/spark on Python 3.6.3

$ __python3 -m flake8 . --count --select=E901,E999,F821,F822,F823 --show-source 
--statistics__
```
./dev/merge_spark_pr.py:98:14: F821 undefined name 'raw_input'
result = raw_input("\n%s (y/n): " % prompt)
 ^
./dev/merge_spark_pr.py:136:22: F821 undefined name 'raw_input'
primary_author = raw_input(
 ^
./dev/merge_spark_pr.py:186:16: F821 undefined name 'raw_input'
pick_ref = raw_input("Enter a branch name [%s]: " % default_branch)
   ^
./dev/merge_spark_pr.py:233:15: F821 undefined name 'raw_input'
jira_id = raw_input("Enter a JIRA id [%s]: " % default_jira_id)
  ^
./dev/merge_spark_pr.py:278:20: F821 undefined name 'raw_input'
fix_versions = raw_input("Enter comma-separated fix version(s) [%s]: " % 
default_fix_versions)
   ^
./dev/merge_spark_pr.py:317:28: F821 undefined name 'raw_input'
raw_assignee = raw_input(
   ^
./dev/merge_spark_pr.py:430:14: F821 undefined name 'raw_input'
pr_num = raw_input("Which pull request would you like to merge? (e.g. 34): 
")
 ^
./dev/merge_spark_pr.py:442:18: F821 undefined name 'raw_input'
result = raw_input("Would you like to use the modified title? (y/n): ")
 ^
./dev/merge_spark_pr.py:493:11: F821 undefined name 'raw_input'
while raw_input("\n%s (y/n): " % pick_prompt).lower() == "y":
  ^
./dev/create-release/releaseutils.py:58:16: F821 undefined name 'raw_input'
response = raw_input("%s [y/n]: " % msg)
   ^
./dev/create-release/releaseutils.py:152:38: F821 undefined name 'unicode'
author = unidecode.unidecode(unicode(author, "UTF-8")).strip()
 ^
./python/setup.py:37:11: F821 undefined name '__version__'
VERSION = __version__
  ^
./python/pyspark/cloudpickle.py:275:18: F821 undefined name 'buffer'
dispatch[buffer] = save_buffer
 ^
./python/pyspark/cloudpickle.py:807:18: F821 undefined name 'file'
dispatch[file] = save_file
 ^
./python/pyspark/sql/conf.py:61:61: F821 undefined name 'unicode'
if not isinstance(obj, str) and not isinstance(obj, unicode):
^
./python/pyspark/sql/streaming.py:25:21: F821 undefined name 'long'
intlike = (int, long)
^
./python/pyspark/streaming/dstream.py:405:35: F821 undefined name 'long'
return self._sc._jvm.Time(long(timestamp * 1000))
  ^
./sql/hive/src/test/resources/data/scripts/dumpdata_script.py:21:10: F821 
undefined name 'xrange'
for i in xrange(50):
 ^
./sql/hive/src/test/resources/data/scripts/dumpdata_script.py:22:14: F821 
undefined name 'xrange'
for j in xrange(5):
 ^
./sql/hive/src/test/resources/data/scripts/dumpdata_script.py:23:18: F821 
undefined name 'xrange'
for k in xrange(20022):
 ^
20F821 undefined name 'raw_input'
20
```

Closes #20838 from cclauss/fix-undefined-names.

Authored-by: cclauss 
Signed-off-by: Bryan Cutler 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71f38ac2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71f38ac2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71f38ac2

Branch: refs/heads/master
Commit: 71f38ac242157cbede684546159f2a27892ee09f
Parents: e754887
Author: cclauss 
Authored: Wed Aug 22 10:06:59 2018 -0700
Committer: Bryan Cutler 
Committed: Wed Aug 22 10:06:59 2018 -0700

--
 dev/create-release/releaseutils.py  |  8 +++--
 dev/merge_spark_pr.py   |  2 +-
 python/pyspark/sql/conf.py  |  5 ++-
 python/pyspark/sql/streaming.py |  5 +--
 python/pyspark/streaming/dstream.py |  2 ++
 python/pyspark/streaming/tests.py   | 34 +++-
 .../resources/data/scripts/dumpdata_script.py   |  3 ++
 7 files changed, 50 insertions(+), 9 deletions(-)
--



spark git commit: [SPARK-23555][PYTHON] Add BinaryType support for Arrow in Python

2018-08-17 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/master ba84bcb2c -> 10f2b6fa0


[SPARK-23555][PYTHON] Add BinaryType support for Arrow in Python

## What changes were proposed in this pull request?

Adding `BinaryType` support for Arrow in pyspark, conditional on using pyarrow 
>= 0.10.0. Earlier versions will continue to raise a TypeError.

## How was this patch tested?

Additional unit tests in pyspark for code paths that use Arrow for 
createDataFrame, toPandas, and scalar pandas_udfs.

Closes #20725 from BryanCutler/arrow-binary-type-support-SPARK-23555.

Authored-by: Bryan Cutler 
Signed-off-by: Bryan Cutler 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/10f2b6fa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/10f2b6fa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/10f2b6fa

Branch: refs/heads/master
Commit: 10f2b6fa05f3d977f3b6099fcd94c5c0cd97a0cb
Parents: ba84bcb
Author: Bryan Cutler 
Authored: Fri Aug 17 22:14:42 2018 -0700
Committer: Bryan Cutler 
Committed: Fri Aug 17 22:14:42 2018 -0700

--
 python/pyspark/sql/tests.py | 66 +---
 python/pyspark/sql/types.py | 15 +
 2 files changed, 70 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/10f2b6fa/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 91ed600..00d7e18 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -4050,6 +4050,8 @@ class ArrowTests(ReusedSQLTestCase):
 def setUpClass(cls):
 from datetime import date, datetime
 from decimal import Decimal
+from distutils.version import LooseVersion
+import pyarrow as pa
 ReusedSQLTestCase.setUpClass()
 
 # Synchronize default timezone between Python and Java
@@ -4078,6 +4080,13 @@ class ArrowTests(ReusedSQLTestCase):
 (u"c", 3, 30, 0.8, 6.0, Decimal("6.0"),
  date(2100, 3, 3), datetime(2100, 3, 3, 3, 3, 3))]
 
+# TODO: remove version check once minimum pyarrow version is 0.10.0
+if LooseVersion("0.10.0") <= LooseVersion(pa.__version__):
+cls.schema.add(StructField("9_binary_t", BinaryType(), True))
+cls.data[0] = cls.data[0] + (bytearray(b"a"),)
+cls.data[1] = cls.data[1] + (bytearray(b"bb"),)
+cls.data[2] = cls.data[2] + (bytearray(b"ccc"),)
+
 @classmethod
 def tearDownClass(cls):
 del os.environ["TZ"]
@@ -4115,12 +4124,23 @@ class ArrowTests(ReusedSQLTestCase):
 self.assertPandasEqual(pdf, pd.DataFrame({u'map': [{u'a': 
1}]}))
 
 def test_toPandas_fallback_disabled(self):
+from distutils.version import LooseVersion
+import pyarrow as pa
+
 schema = StructType([StructField("map", MapType(StringType(), 
IntegerType()), True)])
 df = self.spark.createDataFrame([(None,)], schema=schema)
 with QuietTest(self.sc):
 with self.assertRaisesRegexp(Exception, 'Unsupported type'):
 df.toPandas()
 
+# TODO: remove BinaryType check once minimum pyarrow version is 0.10.0
+if LooseVersion(pa.__version__) < LooseVersion("0.10.0"):
+schema = StructType([StructField("binary", BinaryType(), True)])
+df = self.spark.createDataFrame([(None,)], schema=schema)
+with QuietTest(self.sc):
+with self.assertRaisesRegexp(Exception, 'Unsupported 
type.*BinaryType'):
+df.toPandas()
+
 def test_null_conversion(self):
 df_null = self.spark.createDataFrame([tuple([None for _ in 
range(len(self.data[0]))])] +
  self.data)
@@ -4232,19 +4252,22 @@ class ArrowTests(ReusedSQLTestCase):
 
 def test_createDataFrame_with_incorrect_schema(self):
 pdf = self.create_pandas_data_frame()
-wrong_schema = StructType(list(reversed(self.schema)))
+fields = list(self.schema)
+fields[0], fields[7] = fields[7], fields[0]  # swap str with timestamp
+wrong_schema = StructType(fields)
 with QuietTest(self.sc):
 with self.assertRaisesRegexp(Exception, ".*No 
cast.*string.*timestamp.*"):
 self.spark.createDataFrame(pdf, schema=wrong_schema)
 
 def test_createDataFrame_with_names(self):
 pdf = self.create_pandas_data_frame()
+new_names = list(map(str, range(len(self.schema.fieldNames()
 # Test that schema as a list of column names gets applied
-df = self.spark.createDataFrame(pdf, schema=list('abcdefgh'))
-self.assertEquals(df.schema.fieldNames(), list('abcdefgh'))
+df = 

spark git commit: [SPARK-23874][SQL][PYTHON] Upgrade Apache Arrow to 0.10.0

2018-08-14 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/master 92fd7f321 -> ed075e1ff


[SPARK-23874][SQL][PYTHON] Upgrade Apache Arrow to 0.10.0

## What changes were proposed in this pull request?

Upgrade Apache Arrow to 0.10.0

Version 0.10.0 has a number of bug fixes and improvements with the following 
pertaining directly to usage in Spark:
 * Allow for adding BinaryType support ARROW-2141
 * Bug fix related to array serialization ARROW-1973
 * Python2 str will be made into an Arrow string instead of bytes ARROW-2101
 * Python bytearrays are supported in as input to pyarrow ARROW-2141
 * Java has common interface for reset to cleanup complex vectors in Spark 
ArrowWriter ARROW-1962
 * Cleanup pyarrow type equality checks ARROW-2423
 * ArrowStreamWriter should not hold references to ArrowBlocks ARROW-2632, 
ARROW-2645
 * Improved low level handling of messages for RecordBatch ARROW-2704

## How was this patch tested?

existing tests

Author: Bryan Cutler 

Closes #21939 from BryanCutler/arrow-upgrade-010.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ed075e1f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ed075e1f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ed075e1f

Branch: refs/heads/master
Commit: ed075e1ff60cbb3e7b80b9d2f2ff37054412b934
Parents: 92fd7f3
Author: Bryan Cutler 
Authored: Tue Aug 14 17:13:38 2018 -0700
Committer: Bryan Cutler 
Committed: Tue Aug 14 17:13:38 2018 -0700

--
 dev/deps/spark-deps-hadoop-2.6  |  6 +++---
 dev/deps/spark-deps-hadoop-2.7  |  6 +++---
 dev/deps/spark-deps-hadoop-3.1  |  6 +++---
 pom.xml |  2 +-
 python/pyspark/serializers.py   |  2 ++
 .../spark/sql/vectorized/ArrowColumnVector.java | 12 ++--
 .../spark/sql/execution/arrow/ArrowWriter.scala | 20 +++-
 .../vectorized/ArrowColumnVectorSuite.scala |  4 ++--
 8 files changed, 23 insertions(+), 35 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ed075e1f/dev/deps/spark-deps-hadoop-2.6
--
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 3c0952f..bdab79c 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -14,9 +14,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar
 api-asn1-api-1.0.0-M20.jar
 api-util-1.0.0-M20.jar
 arpack_combined_all-0.1.jar
-arrow-format-0.8.0.jar
-arrow-memory-0.8.0.jar
-arrow-vector-0.8.0.jar
+arrow-format-0.10.0.jar
+arrow-memory-0.10.0.jar
+arrow-vector-0.10.0.jar
 automaton-1.11-8.jar
 avro-1.8.2.jar
 avro-ipc-1.8.2.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/ed075e1f/dev/deps/spark-deps-hadoop-2.7
--
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 310f1e4..ddaf9bb 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -14,9 +14,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar
 api-asn1-api-1.0.0-M20.jar
 api-util-1.0.0-M20.jar
 arpack_combined_all-0.1.jar
-arrow-format-0.8.0.jar
-arrow-memory-0.8.0.jar
-arrow-vector-0.8.0.jar
+arrow-format-0.10.0.jar
+arrow-memory-0.10.0.jar
+arrow-vector-0.10.0.jar
 automaton-1.11-8.jar
 avro-1.8.2.jar
 avro-ipc-1.8.2.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/ed075e1f/dev/deps/spark-deps-hadoop-3.1
--
diff --git a/dev/deps/spark-deps-hadoop-3.1 b/dev/deps/spark-deps-hadoop-3.1
index 9bff2a1..d25d7aa 100644
--- a/dev/deps/spark-deps-hadoop-3.1
+++ b/dev/deps/spark-deps-hadoop-3.1
@@ -12,9 +12,9 @@ aopalliance-1.0.jar
 aopalliance-repackaged-2.4.0-b34.jar
 apache-log4j-extras-1.2.17.jar
 arpack_combined_all-0.1.jar
-arrow-format-0.8.0.jar
-arrow-memory-0.8.0.jar
-arrow-vector-0.8.0.jar
+arrow-format-0.10.0.jar
+arrow-memory-0.10.0.jar
+arrow-vector-0.10.0.jar
 automaton-1.11-8.jar
 avro-1.8.2.jar
 avro-ipc-1.8.2.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/ed075e1f/pom.xml
--
diff --git a/pom.xml b/pom.xml
index 45fca28..979d709 100644
--- a/pom.xml
+++ b/pom.xml
@@ -190,7 +190,7 @@
 If you are changing Arrow version specification, please check 
./python/pyspark/sql/utils.py,
 ./python/run-tests.py and ./python/setup.py too.
 -->
-0.8.0
+0.10.0
 
 ${java.home}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ed075e1f/python/pyspark/serializers.py
--
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 82abf19..47c4c3e 100644
--- 

spark git commit: [SPARK-24976][PYTHON] Allow None for Decimal type conversion (specific to PyArrow 0.9.0)

2018-07-31 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 fc3df4517 -> 5b187a85a


[SPARK-24976][PYTHON] Allow None for Decimal type conversion (specific to 
PyArrow 0.9.0)

## What changes were proposed in this pull request?

See [ARROW-2432](https://jira.apache.org/jira/browse/ARROW-2432). Seems using 
`from_pandas` to convert decimals fails if encounters a value of `None`:

```python
import pyarrow as pa
import pandas as pd
from decimal import Decimal

pa.Array.from_pandas(pd.Series([Decimal('3.14'), None]), type=pa.decimal128(3, 
2))
```

**Arrow 0.8.0**

```

[
  Decimal('3.14'),
  NA
]
```

**Arrow 0.9.0**

```
Traceback (most recent call last):
  File "", line 1, in 
  File "array.pxi", line 383, in pyarrow.lib.Array.from_pandas
  File "array.pxi", line 177, in pyarrow.lib.array
  File "error.pxi", line 77, in pyarrow.lib.check_status
  File "error.pxi", line 77, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Error converting from Python objects to Decimal: Got 
Python object of type NoneType but can only handle these types: decimal.Decimal
```

This PR propose to work around this via Decimal NaN:

```python
pa.Array.from_pandas(pd.Series([Decimal('3.14'), Decimal('NaN')]), 
type=pa.decimal128(3, 2))
```

```

[
  Decimal('3.14'),
  NA
]
```

## How was this patch tested?

Manually tested:

```bash
SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests ScalarPandasUDFTests
```

**Before**

```
Traceback (most recent call last):
  File "/.../spark/python/pyspark/sql/tests.py", line 4672, in 
test_vectorized_udf_null_decimal
self.assertEquals(df.collect(), res.collect())
  File "/.../spark/python/pyspark/sql/dataframe.py", line 533, in collect
sock_info = self._jdf.collectToPython()
  File "/.../spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 
1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File "/.../spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
  File "/.../spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, 
in get_return_value
format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o51.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in 
stage 1.0 failed 1 times, most recent failure: Lost task 3.0 in stage 1.0 (TID 
7, localhost, executor driver): org.apache.spark.api.python.PythonException: 
Traceback (most recent call last):
  File "/.../spark/python/pyspark/worker.py", line 320, in main
process()
  File "/.../spark/python/pyspark/worker.py", line 315, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File "/.../spark/python/pyspark/serializers.py", line 274, in dump_stream
batch = _create_batch(series, self._timezone)
  File "/.../spark/python/pyspark/serializers.py", line 243, in _create_batch
arrs = [create_array(s, t) for s, t in series]
  File "/.../spark/python/pyspark/serializers.py", line 241, in create_array
return pa.Array.from_pandas(s, mask=mask, type=t)
  File "array.pxi", line 383, in pyarrow.lib.Array.from_pandas
  File "array.pxi", line 177, in pyarrow.lib.array
  File "error.pxi", line 77, in pyarrow.lib.check_status
  File "error.pxi", line 77, in pyarrow.lib.check_status
ArrowInvalid: Error converting from Python objects to Decimal: Got Python 
object of type NoneType but can only handle these types: decimal.Decimal
```

**After**

```
Running tests...
--
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
...S.
--
Ran 37 tests in 21.980s
```

Author: hyukjinkwon 

Closes #21928 from HyukjinKwon/SPARK-24976.

(cherry picked from commit f4772fd26f32b11ae54e7721924b5cf6eb27298a)
Signed-off-by: Bryan Cutler 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b187a85
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b187a85
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b187a85

Branch: refs/heads/branch-2.3
Commit: 5b187a85a24c788e19742e03f1300662d475bab8
Parents: fc3df45
Author: hyukjinkwon 
Authored: Tue Jul 31 17:24:24 2018 -0700
Committer: Bryan Cutler 
Committed: Tue Jul 31 17:24:55 2018 -0700

--
 python/pyspark/serializers.py | 10 --
 1 file changed, 8 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5b187a85/python/pyspark/serializers.py
--
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 6d107f3..52a7afe 100644
--- a/python/pyspark/serializers.py

spark git commit: [SPARK-24976][PYTHON] Allow None for Decimal type conversion (specific to PyArrow 0.9.0)

2018-07-31 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/master 42dfe4f15 -> f4772fd26


[SPARK-24976][PYTHON] Allow None for Decimal type conversion (specific to 
PyArrow 0.9.0)

## What changes were proposed in this pull request?

See [ARROW-2432](https://jira.apache.org/jira/browse/ARROW-2432). Seems using 
`from_pandas` to convert decimals fails if encounters a value of `None`:

```python
import pyarrow as pa
import pandas as pd
from decimal import Decimal

pa.Array.from_pandas(pd.Series([Decimal('3.14'), None]), type=pa.decimal128(3, 
2))
```

**Arrow 0.8.0**

```

[
  Decimal('3.14'),
  NA
]
```

**Arrow 0.9.0**

```
Traceback (most recent call last):
  File "", line 1, in 
  File "array.pxi", line 383, in pyarrow.lib.Array.from_pandas
  File "array.pxi", line 177, in pyarrow.lib.array
  File "error.pxi", line 77, in pyarrow.lib.check_status
  File "error.pxi", line 77, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Error converting from Python objects to Decimal: Got 
Python object of type NoneType but can only handle these types: decimal.Decimal
```

This PR propose to work around this via Decimal NaN:

```python
pa.Array.from_pandas(pd.Series([Decimal('3.14'), Decimal('NaN')]), 
type=pa.decimal128(3, 2))
```

```

[
  Decimal('3.14'),
  NA
]
```

## How was this patch tested?

Manually tested:

```bash
SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests ScalarPandasUDFTests
```

**Before**

```
Traceback (most recent call last):
  File "/.../spark/python/pyspark/sql/tests.py", line 4672, in 
test_vectorized_udf_null_decimal
self.assertEquals(df.collect(), res.collect())
  File "/.../spark/python/pyspark/sql/dataframe.py", line 533, in collect
sock_info = self._jdf.collectToPython()
  File "/.../spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 
1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File "/.../spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
  File "/.../spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, 
in get_return_value
format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o51.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in 
stage 1.0 failed 1 times, most recent failure: Lost task 3.0 in stage 1.0 (TID 
7, localhost, executor driver): org.apache.spark.api.python.PythonException: 
Traceback (most recent call last):
  File "/.../spark/python/pyspark/worker.py", line 320, in main
process()
  File "/.../spark/python/pyspark/worker.py", line 315, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File "/.../spark/python/pyspark/serializers.py", line 274, in dump_stream
batch = _create_batch(series, self._timezone)
  File "/.../spark/python/pyspark/serializers.py", line 243, in _create_batch
arrs = [create_array(s, t) for s, t in series]
  File "/.../spark/python/pyspark/serializers.py", line 241, in create_array
return pa.Array.from_pandas(s, mask=mask, type=t)
  File "array.pxi", line 383, in pyarrow.lib.Array.from_pandas
  File "array.pxi", line 177, in pyarrow.lib.array
  File "error.pxi", line 77, in pyarrow.lib.check_status
  File "error.pxi", line 77, in pyarrow.lib.check_status
ArrowInvalid: Error converting from Python objects to Decimal: Got Python 
object of type NoneType but can only handle these types: decimal.Decimal
```

**After**

```
Running tests...
--
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
...S.
--
Ran 37 tests in 21.980s
```

Author: hyukjinkwon 

Closes #21928 from HyukjinKwon/SPARK-24976.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f4772fd2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f4772fd2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f4772fd2

Branch: refs/heads/master
Commit: f4772fd26f32b11ae54e7721924b5cf6eb27298a
Parents: 42dfe4f
Author: hyukjinkwon 
Authored: Tue Jul 31 17:24:24 2018 -0700
Committer: Bryan Cutler 
Committed: Tue Jul 31 17:24:24 2018 -0700

--
 python/pyspark/serializers.py | 10 --
 1 file changed, 8 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f4772fd2/python/pyspark/serializers.py
--
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 4c16b5f..82abf19 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -216,9 +216,10 @@ def _create_batch(series, timezone):
 :param 

spark git commit: [SPARK-24439][ML][PYTHON] Add distanceMeasure to BisectingKMeans in PySpark

2018-06-28 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/master e1d3f8010 -> 2224861f2


[SPARK-24439][ML][PYTHON] Add distanceMeasure to BisectingKMeans in PySpark

## What changes were proposed in this pull request?

add  distanceMeasure to BisectingKMeans in Python.

## How was this patch tested?

added doctest and also manually tested it.

Author: Huaxin Gao 

Closes #21557 from huaxingao/spark-24439.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2224861f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2224861f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2224861f

Branch: refs/heads/master
Commit: 2224861f2f93830d736b625c9a4cb72c918512b2
Parents: e1d3f80
Author: Huaxin Gao 
Authored: Thu Jun 28 14:07:28 2018 -0700
Committer: Bryan Cutler 
Committed: Thu Jun 28 14:07:28 2018 -0700

--
 python/pyspark/ml/clustering.py | 35 ++--
 .../pyspark/ml/param/_shared_params_code_gen.py |  4 ++-
 python/pyspark/ml/param/shared.py   | 24 ++
 3 files changed, 51 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2224861f/python/pyspark/ml/clustering.py
--
diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py
index 4aa1cf8..6d77baf 100644
--- a/python/pyspark/ml/clustering.py
+++ b/python/pyspark/ml/clustering.py
@@ -349,8 +349,8 @@ class KMeansModel(JavaModel, JavaMLWritable, 
JavaMLReadable):
 
 
 @inherit_doc
-class KMeans(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIter, 
HasTol, HasSeed,
- JavaMLWritable, JavaMLReadable):
+class KMeans(JavaEstimator, HasDistanceMeasure, HasFeaturesCol, 
HasPredictionCol, HasMaxIter,
+ HasTol, HasSeed, JavaMLWritable, JavaMLReadable):
 """
 K-means clustering with a k-means++ like initialization mode
 (the k-means|| algorithm by Bahmani et al).
@@ -406,9 +406,6 @@ class KMeans(JavaEstimator, HasFeaturesCol, 
HasPredictionCol, HasMaxIter, HasTol
  typeConverter=TypeConverters.toString)
 initSteps = Param(Params._dummy(), "initSteps", "The number of steps for 
k-means|| " +
   "initialization mode. Must be > 0.", 
typeConverter=TypeConverters.toInt)
-distanceMeasure = Param(Params._dummy(), "distanceMeasure", "The distance 
measure. " +
-"Supported options: 'euclidean' and 'cosine'.",
-typeConverter=TypeConverters.toString)
 
 @keyword_only
 def __init__(self, featuresCol="features", predictionCol="prediction", k=2,
@@ -544,8 +541,8 @@ class BisectingKMeansModel(JavaModel, JavaMLWritable, 
JavaMLReadable):
 
 
 @inherit_doc
-class BisectingKMeans(JavaEstimator, HasFeaturesCol, HasPredictionCol, 
HasMaxIter, HasSeed,
-  JavaMLWritable, JavaMLReadable):
+class BisectingKMeans(JavaEstimator, HasDistanceMeasure, HasFeaturesCol, 
HasPredictionCol,
+  HasMaxIter, HasSeed, JavaMLWritable, JavaMLReadable):
 """
 A bisecting k-means algorithm based on the paper "A comparison of document 
clustering
 techniques" by Steinbach, Karypis, and Kumar, with modification to fit 
Spark.
@@ -585,6 +582,8 @@ class BisectingKMeans(JavaEstimator, HasFeaturesCol, 
HasPredictionCol, HasMaxIte
 >>> bkm2 = BisectingKMeans.load(bkm_path)
 >>> bkm2.getK()
 2
+>>> bkm2.getDistanceMeasure()
+'euclidean'
 >>> model_path = temp_path + "/bkm_model"
 >>> model.save(model_path)
 >>> model2 = BisectingKMeansModel.load(model_path)
@@ -607,10 +606,10 @@ class BisectingKMeans(JavaEstimator, HasFeaturesCol, 
HasPredictionCol, HasMaxIte
 
 @keyword_only
 def __init__(self, featuresCol="features", predictionCol="prediction", 
maxIter=20,
- seed=None, k=4, minDivisibleClusterSize=1.0):
+ seed=None, k=4, minDivisibleClusterSize=1.0, 
distanceMeasure="euclidean"):
 """
 __init__(self, featuresCol="features", predictionCol="prediction", 
maxIter=20, \
- seed=None, k=4, minDivisibleClusterSize=1.0)
+ seed=None, k=4, minDivisibleClusterSize=1.0, 
distanceMeasure="euclidean")
 """
 super(BisectingKMeans, self).__init__()
 self._java_obj = 
self._new_java_obj("org.apache.spark.ml.clustering.BisectingKMeans",
@@ -622,10 +621,10 @@ class BisectingKMeans(JavaEstimator, HasFeaturesCol, 
HasPredictionCol, HasMaxIte
 @keyword_only
 @since("2.0.0")
 def setParams(self, featuresCol="features", predictionCol="prediction", 
maxIter=20,
-  seed=None, k=4, minDivisibleClusterSize=1.0):
+  seed=None, k=4, minDivisibleClusterSize=1.0, 

spark git commit: [SPARK-24324][PYTHON][FOLLOWUP] Grouped Map positional conf should have deprecation note

2018-06-25 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/master 6d16b9885 -> d48803bf6


[SPARK-24324][PYTHON][FOLLOWUP] Grouped Map positional conf should have 
deprecation note

## What changes were proposed in this pull request?

Followup to the discussion of the added conf in SPARK-24324 which allows 
assignment by column position only.  This conf is to preserve old behavior and 
will be removed in future releases, so it should have a note to indicate that.

## How was this patch tested?

NA

Author: Bryan Cutler 

Closes #21637 from 
BryanCutler/arrow-groupedMap-conf-deprecate-followup-SPARK-24324.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d48803bf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d48803bf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d48803bf

Branch: refs/heads/master
Commit: d48803bf64dc0fccd6f560738b4682f0c05e767a
Parents: 6d16b98
Author: Bryan Cutler 
Authored: Mon Jun 25 17:08:23 2018 -0700
Committer: Bryan Cutler 
Committed: Mon Jun 25 17:08:23 2018 -0700

--
 .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d48803bf/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index d5fb524..e768416 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1167,7 +1167,7 @@ object SQLConf {
   .doc("When true, a grouped map Pandas UDF will assign columns from the 
returned " +
 "Pandas DataFrame based on position, regardless of column label type. 
When false, " +
 "columns will be looked up by name if labeled with a string and 
fallback to use " +
-"position if not.")
+"position if not. This configuration will be deprecated in future 
releases.")
   .booleanConf
   .createWithDefault(false)
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23161][PYSPARK][ML] Add missing APIs to Python GBTClassifier

2018-05-30 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/master b142157dc -> ec6f971dc


[SPARK-23161][PYSPARK][ML] Add missing APIs to Python GBTClassifier

## What changes were proposed in this pull request?

Add featureSubsetStrategy in GBTClassifier and GBTRegressor.  Also make 
GBTClassificationModel inherit from JavaClassificationModel instead of 
prediction model so it will have numClasses.

## How was this patch tested?

Add tests in doctest

Author: Huaxin Gao 

Closes #21413 from huaxingao/spark-23161.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec6f971d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec6f971d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec6f971d

Branch: refs/heads/master
Commit: ec6f971dc57bcdc0ad65ac1987b6f0c1801157f4
Parents: b142157
Author: Huaxin Gao 
Authored: Wed May 30 11:04:09 2018 -0700
Committer: Bryan Cutler 
Committed: Wed May 30 11:04:09 2018 -0700

--
 python/pyspark/ml/classification.py | 35 +---
 python/pyspark/ml/regression.py | 70 
 2 files changed, 74 insertions(+), 31 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ec6f971d/python/pyspark/ml/classification.py
--
diff --git a/python/pyspark/ml/classification.py 
b/python/pyspark/ml/classification.py
index 424ecfd..1754c48 100644
--- a/python/pyspark/ml/classification.py
+++ b/python/pyspark/ml/classification.py
@@ -1131,6 +1131,13 @@ class RandomForestClassifier(JavaEstimator, 
HasFeaturesCol, HasLabelCol, HasPred
 def _create_model(self, java_model):
 return RandomForestClassificationModel(java_model)
 
+@since("2.4.0")
+def setFeatureSubsetStrategy(self, value):
+"""
+Sets the value of :py:attr:`featureSubsetStrategy`.
+"""
+return self._set(featureSubsetStrategy=value)
+
 
 class RandomForestClassificationModel(TreeEnsembleModel, 
JavaClassificationModel, JavaMLWritable,
   JavaMLReadable):
@@ -1193,6 +1200,8 @@ class GBTClassifier(JavaEstimator, HasFeaturesCol, 
HasLabelCol, HasPredictionCol
 >>> si_model = stringIndexer.fit(df)
 >>> td = si_model.transform(df)
 >>> gbt = GBTClassifier(maxIter=5, maxDepth=2, labelCol="indexed", seed=42)
+>>> gbt.getFeatureSubsetStrategy()
+'all'
 >>> model = gbt.fit(td)
 >>> model.featureImportances
 SparseVector(1, {0: 1.0})
@@ -1226,6 +1235,8 @@ class GBTClassifier(JavaEstimator, HasFeaturesCol, 
HasLabelCol, HasPredictionCol
 ...  ["indexed", "features"])
 >>> model.evaluateEachIteration(validation)
 [0.25..., 0.23..., 0.21..., 0.19..., 0.18...]
+>>> model.numClasses
+2
 
 .. versionadded:: 1.4.0
 """
@@ -1244,19 +1255,22 @@ class GBTClassifier(JavaEstimator, HasFeaturesCol, 
HasLabelCol, HasPredictionCol
 def __init__(self, featuresCol="features", labelCol="label", 
predictionCol="prediction",
  maxDepth=5, maxBins=32, minInstancesPerNode=1, 
minInfoGain=0.0,
  maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, 
lossType="logistic",
- maxIter=20, stepSize=0.1, seed=None, subsamplingRate=1.0):
+ maxIter=20, stepSize=0.1, seed=None, subsamplingRate=1.0,
+ featureSubsetStrategy="all"):
 """
 __init__(self, featuresCol="features", labelCol="label", 
predictionCol="prediction", \
  maxDepth=5, maxBins=32, minInstancesPerNode=1, 
minInfoGain=0.0, \
  maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, 
\
- lossType="logistic", maxIter=20, stepSize=0.1, seed=None, 
subsamplingRate=1.0)
+ lossType="logistic", maxIter=20, stepSize=0.1, seed=None, 
subsamplingRate=1.0, \
+ featureSubsetStrategy="all")
 """
 super(GBTClassifier, self).__init__()
 self._java_obj = self._new_java_obj(
 "org.apache.spark.ml.classification.GBTClassifier", self.uid)
 self._setDefault(maxDepth=5, maxBins=32, minInstancesPerNode=1, 
minInfoGain=0.0,
  maxMemoryInMB=256, cacheNodeIds=False, 
checkpointInterval=10,
- lossType="logistic", maxIter=20, stepSize=0.1, 
subsamplingRate=1.0)
+ lossType="logistic", maxIter=20, stepSize=0.1, 
subsamplingRate=1.0,
+ featureSubsetStrategy="all")
 kwargs = self._input_kwargs
 self.setParams(**kwargs)
 
@@ -1265,12 +1279,14 @@ class GBTClassifier(JavaEstimator, HasFeaturesCol, 
HasLabelCol, HasPredictionCol
 def setParams(self, featuresCol="features", labelCol="label", 
predictionCol="prediction",
 

spark git commit: [SPARK-24303][PYTHON] Update cloudpickle to v0.4.4

2018-05-18 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/master 7b2dca5b1 -> 0cf59fcbe


[SPARK-24303][PYTHON] Update cloudpickle to v0.4.4

## What changes were proposed in this pull request?

cloudpickle 0.4.4 is released - 
https://github.com/cloudpipe/cloudpickle/releases/tag/v0.4.4

There's no invasive change - the main difference is that we are now able to 
pickle the root logger, which fix is pretty isolated.

## How was this patch tested?

Jenkins tests.

Author: hyukjinkwon 

Closes #21350 from HyukjinKwon/SPARK-24303.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0cf59fcb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0cf59fcb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0cf59fcb

Branch: refs/heads/master
Commit: 0cf59fcbe3799dd3c4469cbf8cd842d668a76f34
Parents: 7b2dca5
Author: hyukjinkwon 
Authored: Fri May 18 09:53:24 2018 -0700
Committer: Bryan Cutler 
Committed: Fri May 18 09:53:24 2018 -0700

--
 python/pyspark/cloudpickle.py | 13 +
 1 file changed, 9 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0cf59fcb/python/pyspark/cloudpickle.py
--
diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py
index ea845b9..88519d7 100644
--- a/python/pyspark/cloudpickle.py
+++ b/python/pyspark/cloudpickle.py
@@ -272,7 +272,7 @@ class CloudPickler(Pickler):
 if not PY3:
 def save_buffer(self, obj):
 self.save(str(obj))
-dispatch[buffer] = save_buffer
+dispatch[buffer] = save_buffer  # noqa: F821 'buffer' was removed in 
Python 3
 
 def save_unsupported(self, obj):
 raise pickle.PicklingError("Cannot pickle objects of type %s" % 
type(obj))
@@ -801,10 +801,10 @@ class CloudPickler(Pickler):
 def save_not_implemented(self, obj):
 self.save_reduce(_gen_not_implemented, ())
 
-if PY3:
-dispatch[io.TextIOWrapper] = save_file
-else:
+try:   # Python 2
 dispatch[file] = save_file
+except NameError:  # Python 3
+dispatch[io.TextIOWrapper] = save_file
 
 dispatch[type(Ellipsis)] = save_ellipsis
 dispatch[type(NotImplemented)] = save_not_implemented
@@ -819,6 +819,11 @@ class CloudPickler(Pickler):
 
 dispatch[logging.Logger] = save_logger
 
+def save_root_logger(self, obj):
+self.save_reduce(logging.getLogger, (), obj=obj)
+
+dispatch[logging.RootLogger] = save_root_logger
+
 """Special functions for Add-on libraries"""
 def inject_addons(self):
 """Plug in system. Register additional pickling functions if modules 
already loaded"""


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24044][PYTHON] Explicitly print out skipped tests from unittest module

2018-04-26 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/master 4f1e38649 -> f7435bec6


[SPARK-24044][PYTHON] Explicitly print out skipped tests from unittest module

## What changes were proposed in this pull request?

This PR proposes to remove duplicated dependency checking logics and also print 
out skipped tests from unittests.

For example, as below:

```
Skipped tests in pyspark.sql.tests with pypy:
test_createDataFrame_column_name_encoding (pyspark.sql.tests.ArrowTests) 
... skipped 'Pandas >= 0.19.2 must be installed; however, it was not found.'
test_createDataFrame_does_not_modify_input (pyspark.sql.tests.ArrowTests) 
... skipped 'Pandas >= 0.19.2 must be installed; however, it was not found.'
...

Skipped tests in pyspark.sql.tests with python3:
test_createDataFrame_column_name_encoding (pyspark.sql.tests.ArrowTests) 
... skipped 'PyArrow >= 0.8.0 must be installed; however, it was not found.'
test_createDataFrame_does_not_modify_input (pyspark.sql.tests.ArrowTests) 
... skipped 'PyArrow >= 0.8.0 must be installed; however, it was not found.'
...
```

Currently, it's not printed out in the console. I think we should better print 
out skipped tests in the console.

## How was this patch tested?

Manually tested. Also, fortunately, Jenkins has good environment to test the 
skipped output.

Author: hyukjinkwon 

Closes #21107 from HyukjinKwon/skipped-tests-print.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7435bec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7435bec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7435bec

Branch: refs/heads/master
Commit: f7435bec6a9348cfbbe26b13c230c08545d16067
Parents: 4f1e386
Author: hyukjinkwon 
Authored: Thu Apr 26 15:11:42 2018 -0700
Committer: Bryan Cutler 
Committed: Thu Apr 26 15:11:42 2018 -0700

--
 python/pyspark/ml/tests.py|  16 +++--
 python/pyspark/mllib/tests.py |   4 +-
 python/pyspark/sql/tests.py   |  51 +--
 python/pyspark/streaming/tests.py |   4 +-
 python/pyspark/tests.py   |  12 +---
 python/run-tests.py   | 115 +++--
 6 files changed, 98 insertions(+), 104 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f7435bec/python/pyspark/ml/tests.py
--
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index 2ec0be6..0935931 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -2136,17 +2136,23 @@ class ImageReaderTest2(PySparkTestCase):
 @classmethod
 def setUpClass(cls):
 super(ImageReaderTest2, cls).setUpClass()
+cls.hive_available = True
 # Note that here we enable Hive's support.
 cls.spark = None
 try:
 cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
 except py4j.protocol.Py4JError:
 cls.tearDownClass()
-raise unittest.SkipTest("Hive is not available")
+cls.hive_available = False
 except TypeError:
 cls.tearDownClass()
-raise unittest.SkipTest("Hive is not available")
-cls.spark = HiveContext._createForTesting(cls.sc)
+cls.hive_available = False
+if cls.hive_available:
+cls.spark = HiveContext._createForTesting(cls.sc)
+
+def setUp(self):
+if not self.hive_available:
+self.skipTest("Hive is not available.")
 
 @classmethod
 def tearDownClass(cls):
@@ -2662,6 +2668,6 @@ class EstimatorTest(unittest.TestCase):
 if __name__ == "__main__":
 from pyspark.ml.tests import *
 if xmlrunner:
-
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'))
+
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'), 
verbosity=2)
 else:
-unittest.main()
+unittest.main(verbosity=2)

http://git-wip-us.apache.org/repos/asf/spark/blob/f7435bec/python/pyspark/mllib/tests.py
--
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 1037bab..14d788b 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -1767,9 +1767,9 @@ if __name__ == "__main__":
 if not _have_scipy:
 print("NOTE: Skipping SciPy tests as it does not seem to be installed")
 if xmlrunner:
-
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'))
+
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'), 
verbosity=2)
 else:
-unittest.main()
+unittest.main(verbosity=2)
 if not 

spark git commit: [SPARK-24057][PYTHON] put the real data type in the AssertionError message

2018-04-26 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/master ce2f919f8 -> 4f1e38649


[SPARK-24057][PYTHON] put the real data type in the AssertionError message

## What changes were proposed in this pull request?

Print out the data type in the AssertionError message to make it more 
meaningful.

## How was this patch tested?

I manually tested the changed code on my local, but didn't add any test.

Author: Huaxin Gao 

Closes #21159 from huaxingao/spark-24057.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f1e3864
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f1e3864
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f1e3864

Branch: refs/heads/master
Commit: 4f1e38649ebc7710850b7c40e6fb355775e7bb7f
Parents: ce2f919
Author: Huaxin Gao 
Authored: Thu Apr 26 14:21:22 2018 -0700
Committer: Bryan Cutler 
Committed: Thu Apr 26 14:21:22 2018 -0700

--
 python/pyspark/sql/types.py | 14 +-
 1 file changed, 9 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4f1e3864/python/pyspark/sql/types.py
--
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 1f65348..3cd7a2e 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -289,7 +289,8 @@ class ArrayType(DataType):
 >>> ArrayType(StringType(), False) == ArrayType(StringType())
 False
 """
-assert isinstance(elementType, DataType), "elementType should be 
DataType"
+assert isinstance(elementType, DataType),\
+"elementType %s should be an instance of %s" % (elementType, 
DataType)
 self.elementType = elementType
 self.containsNull = containsNull
 
@@ -343,8 +344,10 @@ class MapType(DataType):
 ...== MapType(StringType(), FloatType()))
 False
 """
-assert isinstance(keyType, DataType), "keyType should be DataType"
-assert isinstance(valueType, DataType), "valueType should be DataType"
+assert isinstance(keyType, DataType),\
+"keyType %s should be an instance of %s" % (keyType, DataType)
+assert isinstance(valueType, DataType),\
+"valueType %s should be an instance of %s" % (valueType, DataType)
 self.keyType = keyType
 self.valueType = valueType
 self.valueContainsNull = valueContainsNull
@@ -402,8 +405,9 @@ class StructField(DataType):
 ...  == StructField("f2", StringType(), True))
 False
 """
-assert isinstance(dataType, DataType), "dataType should be DataType"
-assert isinstance(name, basestring), "field name should be string"
+assert isinstance(dataType, DataType),\
+"dataType %s should be an instance of %s" % (dataType, DataType)
+assert isinstance(name, basestring), "field name %s should be string" 
% (name)
 if not isinstance(name, str):
 name = name.encode('utf-8')
 self.name = name


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23828][ML][PYTHON] PySpark StringIndexerModel should have constructor from labels

2018-04-06 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/master b6935ffb4 -> e99825058


[SPARK-23828][ML][PYTHON] PySpark StringIndexerModel should have constructor 
from labels

## What changes were proposed in this pull request?

The Scala StringIndexerModel has an alternate constructor that will create the 
model from an array of label strings.  Add the corresponding Python API:

model = StringIndexerModel.from_labels(["a", "b", "c"])

## How was this patch tested?

Add doctest and unit test.

Author: Huaxin Gao 

Closes #20968 from huaxingao/spark-23828.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e9982505
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e9982505
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e9982505

Branch: refs/heads/master
Commit: e998250588de0df250e2800278da4d3e3705c259
Parents: b6935ff
Author: Huaxin Gao 
Authored: Fri Apr 6 11:51:36 2018 -0700
Committer: Bryan Cutler 
Committed: Fri Apr 6 11:51:36 2018 -0700

--
 python/pyspark/ml/feature.py | 88 ---
 python/pyspark/ml/tests.py   | 41 +-
 2 files changed, 104 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e9982505/python/pyspark/ml/feature.py
--
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index fcb0dfc..5a3e0dd 100755
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -2342,9 +2342,38 @@ class StandardScalerModel(JavaModel, JavaMLReadable, 
JavaMLWritable):
 return self._call_java("mean")
 
 
+class _StringIndexerParams(JavaParams, HasHandleInvalid, HasInputCol, 
HasOutputCol):
+"""
+Params for :py:attr:`StringIndexer` and :py:attr:`StringIndexerModel`.
+"""
+
+stringOrderType = Param(Params._dummy(), "stringOrderType",
+"How to order labels of string column. The first 
label after " +
+"ordering is assigned an index of 0. Supported 
options: " +
+"frequencyDesc, frequencyAsc, alphabetDesc, 
alphabetAsc.",
+typeConverter=TypeConverters.toString)
+
+handleInvalid = Param(Params._dummy(), "handleInvalid", "how to handle 
invalid data (unseen " +
+  "or NULL values) in features and label column of 
string type. " +
+  "Options are 'skip' (filter out rows with invalid 
data), " +
+  "error (throw an error), or 'keep' (put invalid data 
" +
+  "in a special additional bucket, at index 
numLabels).",
+  typeConverter=TypeConverters.toString)
+
+def __init__(self, *args):
+super(_StringIndexerParams, self).__init__(*args)
+self._setDefault(handleInvalid="error", 
stringOrderType="frequencyDesc")
+
+@since("2.3.0")
+def getStringOrderType(self):
+"""
+Gets the value of :py:attr:`stringOrderType` or its default value 
'frequencyDesc'.
+"""
+return self.getOrDefault(self.stringOrderType)
+
+
 @inherit_doc
-class StringIndexer(JavaEstimator, HasInputCol, HasOutputCol, 
HasHandleInvalid, JavaMLReadable,
-JavaMLWritable):
+class StringIndexer(JavaEstimator, _StringIndexerParams, JavaMLReadable, 
JavaMLWritable):
 """
 A label indexer that maps a string column of labels to an ML column of 
label indices.
 If the input column is numeric, we cast it to string and index the string 
values.
@@ -2388,23 +2417,16 @@ class StringIndexer(JavaEstimator, HasInputCol, 
HasOutputCol, HasHandleInvalid,
 >>> sorted(set([(i[0], i[1]) for i in td.select(td.id, 
td.indexed).collect()]),
 ... key=lambda x: x[0])
 [(0, 2.0), (1, 1.0), (2, 0.0), (3, 2.0), (4, 2.0), (5, 0.0)]
+>>> fromlabelsModel = StringIndexerModel.from_labels(["a", "b", "c"],
+... inputCol="label", outputCol="indexed", handleInvalid="error")
+>>> result = fromlabelsModel.transform(stringIndDf)
+>>> sorted(set([(i[0], i[1]) for i in result.select(result.id, 
result.indexed).collect()]),
+... key=lambda x: x[0])
+[(0, 0.0), (1, 1.0), (2, 2.0), (3, 0.0), (4, 0.0), (5, 2.0)]
 
 .. versionadded:: 1.4.0
 """
 
-stringOrderType = Param(Params._dummy(), "stringOrderType",
-"How to order labels of string column. The first 
label after " +
-"ordering is assigned an index of 0. Supported 
options: " +
-"frequencyDesc, frequencyAsc, alphabetDesc, 
alphabetAsc.",
-typeConverter=TypeConverters.toString)
-
-

spark git commit: [SPARK-15009][PYTHON][FOLLOWUP] Add default param checks for CountVectorizerModel

2018-04-02 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/master 529f84710 -> 44a9f8e6e


[SPARK-15009][PYTHON][FOLLOWUP] Add default param checks for 
CountVectorizerModel

## What changes were proposed in this pull request?

Adding test for default params for `CountVectorizerModel` constructed from 
vocabulary.  This required that the param `maxDF` be added, which was done in 
SPARK-23615.

## How was this patch tested?

Added an explicit test for CountVectorizerModel in DefaultValuesTests.

Author: Bryan Cutler 

Closes #20942 from 
BryanCutler/pyspark-CountVectorizerModel-default-param-test-SPARK-15009.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44a9f8e6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44a9f8e6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44a9f8e6

Branch: refs/heads/master
Commit: 44a9f8e6e82c300dc61ca18515aee16f17f27501
Parents: 529f847
Author: Bryan Cutler 
Authored: Mon Apr 2 09:53:37 2018 -0700
Committer: Bryan Cutler 
Committed: Mon Apr 2 09:53:37 2018 -0700

--
 python/pyspark/ml/tests.py | 5 +
 1 file changed, 5 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/44a9f8e6/python/pyspark/ml/tests.py
--
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index 6b4376c..c2c4861 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -2096,6 +2096,11 @@ class DefaultValuesTests(PySparkTestCase):
 # NOTE: disable check_params_exist until there is parity 
with Scala API
 ParamTests.check_params(self, cls(), 
check_params_exist=False)
 
+# Additional classes that need explicit construction
+from pyspark.ml.feature import CountVectorizerModel
+ParamTests.check_params(self, 
CountVectorizerModel.from_vocabulary(['a'], 'input'),
+check_params_exist=False)
+
 
 def _squared_distance(a, b):
 if isinstance(a, Vector):


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23699][PYTHON][SQL] Raise same type of error caught with Arrow enabled

2018-03-27 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/master c68ec4e6a -> ed72badb0


[SPARK-23699][PYTHON][SQL] Raise same type of error caught with Arrow enabled

## What changes were proposed in this pull request?

When using Arrow for createDataFrame or toPandas and an error is encountered 
with fallback disabled, this will raise the same type of error instead of a 
RuntimeError.  This change also allows for the traceback of the error to be 
retained and prevents the accidental chaining of exceptions with Python 3.

## How was this patch tested?

Updated existing tests to verify error type.

Author: Bryan Cutler 

Closes #20839 from BryanCutler/arrow-raise-same-error-SPARK-23699.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ed72badb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ed72badb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ed72badb

Branch: refs/heads/master
Commit: ed72badb04a56d8046bbd185245abf5ae265ccfd
Parents: c68ec4e
Author: Bryan Cutler 
Authored: Tue Mar 27 20:06:12 2018 -0700
Committer: Bryan Cutler 
Committed: Tue Mar 27 20:06:12 2018 -0700

--
 python/pyspark/sql/dataframe.py | 25 +
 python/pyspark/sql/session.py   | 13 +++--
 python/pyspark/sql/tests.py | 10 +-
 python/pyspark/sql/utils.py |  6 ++
 4 files changed, 31 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ed72badb/python/pyspark/sql/dataframe.py
--
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 3fc194d..16f8e52 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -2007,7 +2007,7 @@ class DataFrame(object):
 "toPandas attempted Arrow optimization because "
 "'spark.sql.execution.arrow.enabled' is set to true; 
however, "
 "failed by the reason below:\n  %s\n"
-"Attempts non-optimization as "
+"Attempting non-optimization as "
 "'spark.sql.execution.arrow.fallback.enabled' is set 
to "
 "true." % _exception_message(e))
 warnings.warn(msg)
@@ -2015,11 +2015,12 @@ class DataFrame(object):
 else:
 msg = (
 "toPandas attempted Arrow optimization because "
-"'spark.sql.execution.arrow.enabled' is set to true; 
however, "
-"failed by the reason below:\n  %s\n"
-"For fallback to non-optimization automatically, 
please set true to "
-"'spark.sql.execution.arrow.fallback.enabled'." % 
_exception_message(e))
-raise RuntimeError(msg)
+"'spark.sql.execution.arrow.enabled' is set to true, 
but has reached "
+"the error below and will not continue because 
automatic fallback "
+"with 'spark.sql.execution.arrow.fallback.enabled' has 
been set to "
+"false.\n  %s" % _exception_message(e))
+warnings.warn(msg)
+raise
 
 # Try to use Arrow optimization when the schema is supported and 
the required version
 # of PyArrow is found, if 'spark.sql.execution.arrow.enabled' is 
enabled.
@@ -2042,12 +2043,12 @@ class DataFrame(object):
 # be executed. So, simply fail in this case for now.
 msg = (
 "toPandas attempted Arrow optimization because "
-"'spark.sql.execution.arrow.enabled' is set to true; 
however, "
-"failed unexpectedly:\n  %s\n"
-"Note that 
'spark.sql.execution.arrow.fallback.enabled' does "
-"not have an effect in such failure in the middle of "
-"computation." % _exception_message(e))
-raise RuntimeError(msg)
+"'spark.sql.execution.arrow.enabled' is set to true, 
but has reached "
+"the error below and can not continue. Note that "
+"'spark.sql.execution.arrow.fallback.enabled' does not 
have an effect "
+"on failures in the middle of computation.\n  %s" % 
_exception_message(e))
+warnings.warn(msg)
+raise
 
 # Below is toPandas without Arrow optimization.
 pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)


spark git commit: [SPARK-23162][PYSPARK][ML] Add r2adj into Python API in LinearRegressionSummary

2018-03-26 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/master b30a7d28b -> 3e778f5a9


[SPARK-23162][PYSPARK][ML] Add r2adj into Python API in LinearRegressionSummary

## What changes were proposed in this pull request?

Adding r2adj in LinearRegressionSummary for Python API.

## How was this patch tested?

Added unit tests to exercise the api calls for the summary classes in tests.py.

Author: Kevin Yu 

Closes #20842 from kevinyu98/spark-23162.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e778f5a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e778f5a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e778f5a

Branch: refs/heads/master
Commit: 3e778f5a91b0553b09fe0e0ee84d771a71504960
Parents: b30a7d2
Author: Kevin Yu 
Authored: Mon Mar 26 15:45:27 2018 -0700
Committer: Bryan Cutler 
Committed: Mon Mar 26 15:45:27 2018 -0700

--
 python/pyspark/ml/regression.py | 18 --
 python/pyspark/ml/tests.py  |  1 +
 2 files changed, 17 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3e778f5a/python/pyspark/ml/regression.py
--
diff --git a/python/pyspark/ml/regression.py b/python/pyspark/ml/regression.py
index de0a0fa..9a66d87 100644
--- a/python/pyspark/ml/regression.py
+++ b/python/pyspark/ml/regression.py
@@ -336,10 +336,10 @@ class LinearRegressionSummary(JavaWrapper):
 @since("2.0.0")
 def r2(self):
 """
-Returns R^2^, the coefficient of determination.
+Returns R^2, the coefficient of determination.
 
 .. seealso:: `Wikipedia coefficient of determination \
-`
+`_
 
 .. note:: This ignores instance weights (setting all to 1.0) from
 `LinearRegression.weightCol`. This will change in later Spark
@@ -348,6 +348,20 @@ class LinearRegressionSummary(JavaWrapper):
 return self._call_java("r2")
 
 @property
+@since("2.4.0")
+def r2adj(self):
+"""
+Returns Adjusted R^2, the adjusted coefficient of determination.
+
+.. seealso:: `Wikipedia coefficient of determination, Adjusted R^2 \
+
`_
+
+.. note:: This ignores instance weights (setting all to 1.0) from
+`LinearRegression.weightCol`. This will change in later Spark 
versions.
+"""
+return self._call_java("r2adj")
+
+@property
 @since("2.0.0")
 def residuals(self):
 """

http://git-wip-us.apache.org/repos/asf/spark/blob/3e778f5a/python/pyspark/ml/tests.py
--
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index cf1ffa1..6b4376c 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -1559,6 +1559,7 @@ class TrainingSummaryTest(SparkSessionTestCase):
 self.assertAlmostEqual(s.meanSquaredError, 0.0)
 self.assertAlmostEqual(s.rootMeanSquaredError, 0.0)
 self.assertAlmostEqual(s.r2, 1.0, 2)
+self.assertAlmostEqual(s.r2adj, 1.0, 2)
 self.assertTrue(isinstance(s.residuals, DataFrame))
 self.assertEqual(s.numInstances, 2)
 self.assertEqual(s.degreesOfFreedom, 1)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23615][ML][PYSPARK] Add maxDF Parameter to Python CountVectorizer

2018-03-23 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/master 95c03cbd2 -> a33655348


[SPARK-23615][ML][PYSPARK] Add maxDF Parameter to Python CountVectorizer

## What changes were proposed in this pull request?

The maxDF parameter is for filtering out frequently occurring terms. This param 
was recently added to the Scala CountVectorizer and needs to be added to Python 
also.

## How was this patch tested?

add test

Author: Huaxin Gao 

Closes #20777 from huaxingao/spark-23615.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a3365534
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a3365534
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a3365534

Branch: refs/heads/master
Commit: a33655348c4066d9c1d8ad2055aadfbc892ba7fd
Parents: 95c03cb
Author: Huaxin Gao 
Authored: Fri Mar 23 15:58:48 2018 -0700
Committer: Bryan Cutler 
Committed: Fri Mar 23 15:58:48 2018 -0700

--
 .../spark/ml/feature/CountVectorizer.scala  | 20 +-
 python/pyspark/ml/feature.py| 40 +++-
 python/pyspark/ml/tests.py  | 25 
 3 files changed, 67 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a3365534/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala
index 60a4f91..9e0ed43 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala
@@ -70,19 +70,21 @@ private[feature] trait CountVectorizerParams extends Params 
with HasInputCol wit
   def getMinDF: Double = $(minDF)
 
   /**
-   * Specifies the maximum number of different documents a term must appear in 
to be included
-   * in the vocabulary.
-   * If this is an integer greater than or equal to 1, this specifies the 
number of documents
-   * the term must appear in; if this is a double in [0,1), then this 
specifies the fraction of
-   * documents.
+   * Specifies the maximum number of different documents a term could appear 
in to be included
+   * in the vocabulary. A term that appears more than the threshold will be 
ignored. If this is an
+   * integer greater than or equal to 1, this specifies the maximum number of 
documents the term
+   * could appear in; if this is a double in [0,1), then this specifies the 
maximum fraction of
+   * documents the term could appear in.
*
-   * Default: (2^64^) - 1
+   * Default: (2^63^) - 1
* @group param
*/
   val maxDF: DoubleParam = new DoubleParam(this, "maxDF", "Specifies the 
maximum number of" +
-" different documents a term must appear in to be included in the 
vocabulary." +
-" If this is an integer >= 1, this specifies the number of documents the 
term must" +
-" appear in; if this is a double in [0,1), then this specifies the 
fraction of documents.",
+" different documents a term could appear in to be included in the 
vocabulary." +
+" A term that appears more than the threshold will be ignored. If this is 
an integer >= 1," +
+" this specifies the maximum number of documents the term could appear 
in;" +
+" if this is a double in [0,1), then this specifies the maximum fraction 
of" +
+" documents the term could appear in.",
 ParamValidators.gtEq(0.0))
 
   /** @group getParam */

http://git-wip-us.apache.org/repos/asf/spark/blob/a3365534/python/pyspark/ml/feature.py
--
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index a1ceb7f..fcb0dfc 100755
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -422,6 +422,14 @@ class _CountVectorizerParams(JavaParams, HasInputCol, 
HasOutputCol):
 " If this is an integer >= 1, this specifies the number of documents 
the term must" +
 " appear in; if this is a double in [0,1), then this specifies the 
fraction of documents." +
 " Default 1.0", typeConverter=TypeConverters.toFloat)
+maxDF = Param(
+Params._dummy(), "maxDF", "Specifies the maximum number of" +
+" different documents a term could appear in to be included in the 
vocabulary." +
+" A term that appears more than the threshold will be ignored. If this 
is an" +
+" integer >= 1, this specifies the maximum number of documents the 
term could appear in;" +
+" if this is a double in [0,1), then this specifies the maximum" +
+" fraction of documents the term could appear 

spark git commit: [SPARK-23691][PYTHON] Use sql_conf util in PySpark tests where possible

2018-03-19 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/master 5f4deff19 -> 566321852


[SPARK-23691][PYTHON] Use sql_conf util in PySpark tests where possible

## What changes were proposed in this pull request?

https://github.com/apache/spark/commit/d6632d185e147fcbe6724545488ad80dce20277e 
added an useful util

```python
contextmanager
def sql_conf(self, pairs):
...
```

to allow configuration set/unset within a block:

```python
with self.sql_conf({"spark.blah.blah.blah", "blah"})
# test codes
```

This PR proposes to use this util where possible in PySpark tests.

Note that there look already few places affecting tests without restoring the 
original value back in unittest classes.

## How was this patch tested?

Manually tested via:

```
./run-tests --modules=pyspark-sql --python-executables=python2
./run-tests --modules=pyspark-sql --python-executables=python3
```

Author: hyukjinkwon 

Closes #20830 from HyukjinKwon/cleanup-sql-conf.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/56632185
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/56632185
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/56632185

Branch: refs/heads/master
Commit: 566321852b2d60641fe86acbc8914b4a7063b58e
Parents: 5f4deff
Author: hyukjinkwon 
Authored: Mon Mar 19 21:25:37 2018 -0700
Committer: Bryan Cutler 
Committed: Mon Mar 19 21:25:37 2018 -0700

--
 python/pyspark/sql/tests.py | 130 +++
 1 file changed, 50 insertions(+), 80 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/56632185/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index a0d547a..39d6c52 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -2461,17 +2461,13 @@ class SQLTests(ReusedSQLTestCase):
 df1 = self.spark.range(1).toDF("a")
 df2 = self.spark.range(1).toDF("b")
 
-try:
-self.spark.conf.set("spark.sql.crossJoin.enabled", "false")
+with self.sql_conf({"spark.sql.crossJoin.enabled": False}):
 self.assertRaises(AnalysisException, lambda: df1.join(df2, 
how="inner").collect())
 
-self.spark.conf.set("spark.sql.crossJoin.enabled", "true")
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
 actual = df1.join(df2, how="inner").collect()
 expected = [Row(a=0, b=0)]
 self.assertEqual(actual, expected)
-finally:
-# We should unset this. Otherwise, other tests are affected.
-self.spark.conf.unset("spark.sql.crossJoin.enabled")
 
 # Regression test for invalid join methods when on is None, Spark-14761
 def test_invalid_join_method(self):
@@ -2943,21 +2939,18 @@ class SQLTests(ReusedSQLTestCase):
 self.assertPandasEqual(pdf, df.toPandas())
 
 orig_env_tz = os.environ.get('TZ', None)
-orig_session_tz = self.spark.conf.get('spark.sql.session.timeZone')
 try:
 tz = 'America/Los_Angeles'
 os.environ['TZ'] = tz
 time.tzset()
-self.spark.conf.set('spark.sql.session.timeZone', tz)
-
-df = self.spark.createDataFrame(pdf)
-self.assertPandasEqual(pdf, df.toPandas())
+with self.sql_conf({'spark.sql.session.timeZone': tz}):
+df = self.spark.createDataFrame(pdf)
+self.assertPandasEqual(pdf, df.toPandas())
 finally:
 del os.environ['TZ']
 if orig_env_tz is not None:
 os.environ['TZ'] = orig_env_tz
 time.tzset()
-self.spark.conf.set('spark.sql.session.timeZone', orig_session_tz)
 
 
 class HiveSparkSubmitTests(SparkSubmitTests):
@@ -3562,12 +3555,11 @@ class ArrowTests(ReusedSQLTestCase):
 self.assertTrue(all([c == 1 for c in null_counts]))
 
 def _toPandas_arrow_toggle(self, df):
-self.spark.conf.set("spark.sql.execution.arrow.enabled", "false")
-try:
+with self.sql_conf({"spark.sql.execution.arrow.enabled": False}):
 pdf = df.toPandas()
-finally:
-self.spark.conf.set("spark.sql.execution.arrow.enabled", "true")
+
 pdf_arrow = df.toPandas()
+
 return pdf, pdf_arrow
 
 def test_toPandas_arrow_toggle(self):
@@ -3579,16 +3571,17 @@ class ArrowTests(ReusedSQLTestCase):
 
 def test_toPandas_respect_session_timezone(self):
 df = self.spark.createDataFrame(self.data, schema=self.schema)
-orig_tz = self.spark.conf.get("spark.sql.session.timeZone")
-try:
-timezone = "America/New_York"
-

spark-website git commit: Update committer pages

2018-02-22 Thread cutlerb
Repository: spark-website
Updated Branches:
  refs/heads/asf-site 3f874c90a -> 6853fd7c6


Update committer pages


Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/6853fd7c
Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/6853fd7c
Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/6853fd7c

Branch: refs/heads/asf-site
Commit: 6853fd7c6964389606e64d0a98db204ba5d631de
Parents: 3f874c9
Author: Bryan Cutler 
Authored: Wed Feb 21 15:55:56 2018 -0800
Committer: Bryan Cutler 
Committed: Wed Feb 21 15:55:56 2018 -0800

--
 committers.md| 1 +
 site/committers.html | 4 
 2 files changed, 5 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark-website/blob/6853fd7c/committers.md
--
diff --git a/committers.md b/committers.md
index 6c6b8ab..4098fb2 100644
--- a/committers.md
+++ b/committers.md
@@ -15,6 +15,7 @@ navigation:
 |Joseph Bradley|Databricks|
 |Felix Cheung|Microsoft|
 |Mosharaf Chowdhury|University of Michigan, Ann Arbor|
+|Bryan Cutler|IBM|
 |Jason Dai|Intel|
 |Tathagata Das|Databricks|
 |Ankur Dave|UC Berkeley|

http://git-wip-us.apache.org/repos/asf/spark-website/blob/6853fd7c/site/committers.html
--
diff --git a/site/committers.html b/site/committers.html
index c545cd2..83e275e 100644
--- a/site/committers.html
+++ b/site/committers.html
@@ -225,6 +225,10 @@
   University of Michigan, Ann Arbor
 
 
+  Bryan Cutler
+  IBM
+
+
   Jason Dai
   Intel
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org