[GitHub] spark issue #19061: [SPARK-21568][CORE] ConsoleProgressBar should only be en...

2017-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19061
  
**[Test build #82441 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82441/testReport)**
 for PR 19061 at commit 
[`e2158ad`](https://github.com/apache/spark/commit/e2158ad13a5ff185bbe9dd26e4c00bc75db9dcfb).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19425: [SPARK-22196][Core] Combine multiple input splits...

2017-10-04 Thread vgankidi
GitHub user vgankidi opened a pull request:

https://github.com/apache/spark/pull/19425

[SPARK-22196][Core] Combine multiple input splits into a HadoopPartition

## What changes were proposed in this pull request?

Spark native read path allows tuning the partition size based on 
spark.sql.files.maxPartitionBytes and spark.sql.files.openCostInBytes. It would 
be useful to add a similar functionality/behavior to HadoopRDD, i.e, pack 
multiple input splits into a single partition based on maxPartitionBytes and 
openCostInBytes. We have had several use-cases to merge small files by 
coalescing them by size to reduce the number of tasks launched.

## How was this patch tested?
Added a unit test. It was also tested manually in a few production jobs. 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vgankidi/spark SPARK-22196

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19425.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19425


commit 2f4e32681e50d4b42ed5b3d05d91e45483679bee
Author: Vinitha Gankidi 
Date:   2017-10-04T06:36:56Z

[SPARK-22196][Core] Combine multiple input splits into a HadoopPartition




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19061: [SPARK-21568][CORE] ConsoleProgressBar should only be en...

2017-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19061
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82441/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19083
  
**[Test build #82443 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82443/testReport)**
 for PR 19083 at commit 
[`09ae105`](https://github.com/apache/spark/commit/09ae105c101a1b31d2a8873976c01590c50411d2).
 * This patch **fails due to an unknown error code, -9**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19061: [SPARK-21568][CORE] ConsoleProgressBar should only be en...

2017-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19061
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19424: [SPARK-22197][SQL] push down operators to data source be...

2017-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19424
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82444/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19083
  
**[Test build #82442 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82442/testReport)**
 for PR 19083 at commit 
[`433f13b`](https://github.com/apache/spark/commit/433f13b03e995bbb47641b44ed1f7961cc4ea2ec).
 * This patch **fails due to an unknown error code, -9**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19424: [SPARK-22197][SQL] push down operators to data source be...

2017-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19424
  
**[Test build #82444 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82444/testReport)**
 for PR 19424 at commit 
[`75457f6`](https://github.com/apache/spark/commit/75457f608b9068ad1b3b3eb129f14d4e1b4ed946).
 * This patch **fails due to an unknown error code, -9**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19083
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19083
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19424: [SPARK-22197][SQL] push down operators to data source be...

2017-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19424
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19083
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82442/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19083
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82443/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18704: [SPARK-20783][SQL] Create ColumnVector to abstract exist...

2017-10-04 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/18704
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19425: [SPARK-22196][Core] Combine multiple input splits into a...

2017-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19425
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-04 Thread maropu
Github user maropu commented on the issue:

https://github.com/apache/spark/pull/19083
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19083
  
**[Test build #82445 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82445/testReport)**
 for PR 19083 at commit 
[`09ae105`](https://github.com/apache/spark/commit/09ae105c101a1b31d2a8873976c01590c50411d2).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-10-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18704


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19083
  
**[Test build #82446 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82446/testReport)**
 for PR 19083 at commit 
[`09ae105`](https://github.com/apache/spark/commit/09ae105c101a1b31d2a8873976c01590c50411d2).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19426: [SPARK-22190][CORE] Add Spark executor task metri...

2017-10-04 Thread LucaCanali
GitHub user LucaCanali opened a pull request:

https://github.com/apache/spark/pull/19426

[SPARK-22190][CORE] Add Spark executor task metrics to Dropwizard metrics

## What changes were proposed in this pull request?

This proposed patch is about making Spark executor task metrics available 
as Dropwizard metrics. This is intended to be of aid in monitoring Spark jobs 
and when drilling down on performance troubleshooting issues.

## How was this patch tested?

Manually tested on a Spark cluster (see JIRA for an example screenshot).


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/LucaCanali/spark SparkTaskMetricsDropWizard

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19426.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19426


commit ca43764327e8796ea01d5053fc73ca7136b57835
Author: LucaCanali 
Date:   2017-10-03T13:48:44Z

Add Spark executor task metrics to Dropwizard metrics




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19426: [SPARK-22190][CORE] Add Spark executor task metrics to D...

2017-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19426
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142586717
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+arrow_return_types = [to_arrow_type(field.dataType) for field in 
return_type]
+
+def fn(*a):
+import pandas as pd
+out = f(*a)
+assert isinstance(out, pd.DataFrame), \
+'Return value from the user function is not a 
pandas.DataFrame.'
+assert len(out.columns) == len(arrow_return_types), \
+'Number of columns of the returned pd.DataFrame doesn\'t 
match output schema. ' \
--- End diff --

little nit: `pd.DataFrame` -> `pandas.DataFrame`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142591369
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -435,6 +435,33 @@ class RelationalGroupedDataset protected[sql](
   df.logicalPlan.output,
   df.logicalPlan))
   }
+
+  private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = {
+require(expr.vectorized, "Must pass a vectorized python udf")
+
+val output = expr.dataType match {
+  case s: StructType => s.map {
--- End diff --

little nit (see 
https://github.com/databricks/scala-style-guide#pattern-matching):

```scala
  case s: StructType => s.map { case StructField(name, dataType, 
nullable, metadata) =>
AttributeReference(name, dataType, nullable, metadata)()
  }
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142598216
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3106,8 +3106,9 @@ def assertFramesEqual(self, df_with_arrow, 
df_without):
 self.assertTrue(df_without.equals(df_with_arrow), msg=msg)
 
 def test_unsupported_datatype(self):
-schema = StructType([StructField("dt", DateType(), True)])
-df = self.spark.createDataFrame([(datetime.date(1970, 1, 1),)], 
schema=schema)
--- End diff --

Hm.. why did we change this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142586105
  
--- Diff: python/pyspark/sql/group.py ---
@@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col, values)
 return GroupedData(jgd, self.sql_ctx)
 
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-function should take a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
+Each group is passed as a `pandas.DataFrame` to the user-function 
and the returned
+`pandas.DataFrame` are combined as a :class:`DataFrame`. The 
returned `pandas.DataFrame`
+can be arbitrary length and its schema should match the returnType 
of the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = DataFrame(self._jgd.df(), self.sql_ctx)
+func = udf.func
+returnType = udf.returnType
+
+# The python executors expects the function to take a list of 
pd.Series as input
+# So we to create a wrapper function that turns that to a 
pd.DataFrame before passing
+# down to the user function
+columns = df.columns
+
+def wrapped(*cols):
+import pandas as pd
+return func(pd.concat(cols, axis=1, keys=columns))
--- End diff --

To me, I don't have a strong preference but I kind of tend to think it's 
okay as is, for now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142593679
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+arrow_return_types = [to_arrow_type(field.dataType) for field in 
return_type]
+
+def fn(*a):
--- End diff --

I'd say something like ... `verify_result_type`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142591629
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -435,6 +435,33 @@ class RelationalGroupedDataset protected[sql](
   df.logicalPlan.output,
   df.logicalPlan))
   }
+
+  private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = {
+require(expr.vectorized, "Must pass a vectorized python udf")
+
+val output = expr.dataType match {
+  case s: StructType => s.map {
+case StructField(name, dataType, nullable, metadata) =>
+  AttributeReference(name, dataType, nullable, metadata)()
+  }
+}
+
+val groupingAttributes: Seq[Attribute] = groupingExprs.map {
+  case ne: NamedExpression => ne.toAttribute
--- End diff --

little nit:

```scala
 val groupingAttributes: Seq[Attribute] = groupingExprs.map { case ne: 
NamedExpression =>
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142591870
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -435,6 +435,33 @@ class RelationalGroupedDataset protected[sql](
   df.logicalPlan.output,
   df.logicalPlan))
   }
+
+  private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = {
+require(expr.vectorized, "Must pass a vectorized python udf")
+
+val output = expr.dataType match {
+  case s: StructType => s.map {
+case StructField(name, dataType, nullable, metadata) =>
+  AttributeReference(name, dataType, nullable, metadata)()
+  }
+}
+
+val groupingAttributes: Seq[Attribute] = groupingExprs.map {
+  case ne: NamedExpression => ne.toAttribute
+}
+
+val plan = FlatMapGroupsInPandas(
+  groupingAttributes,
+  expr,
+  output,
+  df.logicalPlan
+)
+
+Dataset.ofRows(
--- End diff --

little nit: I'd write as `Dataset.ofRows(df.sparkSession, plan)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142585501
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -519,3 +519,18 @@ case class CoGroup(
 outputObjAttr: Attribute,
 left: LogicalPlan,
 right: LogicalPlan) extends BinaryNode with ObjectProducer
+
+case class FlatMapGroupsInPandas(
--- End diff --

Adding comments for `FlatMapGroupsInPandas` sounds a good suggestion BTW.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142586820
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+arrow_return_types = [to_arrow_type(field.dataType) for field in 
return_type]
+
+def fn(*a):
+import pandas as pd
+out = f(*a)
+assert isinstance(out, pd.DataFrame), \
+'Return value from the user function is not a 
pandas.DataFrame.'
--- End diff --

little nit: `the user function` -> `the user-defined function`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142586761
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+arrow_return_types = [to_arrow_type(field.dataType) for field in 
return_type]
+
+def fn(*a):
+import pandas as pd
+out = f(*a)
--- End diff --

I'd say `result` instead of `out`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142592031
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -435,6 +435,33 @@ class RelationalGroupedDataset protected[sql](
   df.logicalPlan.output,
   df.logicalPlan))
   }
+
+  private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = {
+require(expr.vectorized, "Must pass a vectorized python udf")
+
+val output = expr.dataType match {
+  case s: StructType => s.map {
+case StructField(name, dataType, nullable, metadata) =>
+  AttributeReference(name, dataType, nullable, metadata)()
+  }
+}
+
+val groupingAttributes: Seq[Attribute] = groupingExprs.map {
+  case ne: NamedExpression => ne.toAttribute
+}
+
+val plan = FlatMapGroupsInPandas(
+  groupingAttributes,
+  expr,
+  output,
+  df.logicalPlan
+)
--- End diff --

little nit: I'd write it 

```scala
val plan = FlatMapGroupsInPandas(groupingAttributes, expr, output, 
df.logicalPlan)
```

or

```scala
val plan = FlatMapGroupsInPandas(
  groupingAttributes, expr, output, df.logicalPlan)
```

if you wouldn't mind.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142588765
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
--- End diff --

Hm... so, it looks we distinguish both groupping pandas udf and normal 
pandas udf here by `StructType`. I guess it wouldn't work with normal pandas 
udfs if I didn't miss something? This looks a bit odds.

I think we should at least add some comments here in quite more details and 
throws an clear exception. When I first saw, I thought this PR also supports 
normal pandas udfs with `StructType` somehow.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142584934
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,66 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self)
+
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-function should take a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
--- End diff --

little nit: `user-function` -> `user-defined function`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19424: [SPARK-22197][SQL] push down operators to data source be...

2017-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19424
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19424: [SPARK-22197][SQL] push down operators to data source be...

2017-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19424
  
**[Test build #82447 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82447/testReport)**
 for PR 19424 at commit 
[`75457f6`](https://github.com/apache/spark/commit/75457f608b9068ad1b3b3eb129f14d4e1b4ed946).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142610856
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.TaskContext
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, 
AttributeSet, Expression, NamedExpression, SortOrder, UnsafeProjection}
+import 
org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution, Partitioning}
+import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+
+case class FlatMapGroupsInPandasExec(
+grouping: Seq[Expression],
+func: Expression,
+override val output: Seq[Attribute],
+override val child: SparkPlan
+) extends UnaryExecNode {
+
+  val groupingAttributes: Seq[Attribute] = grouping.map {
+case ne: NamedExpression => ne.toAttribute
+  }
+
+  private val pandasFunction = func.asInstanceOf[PythonUDF].func
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def producedAttributes: AttributeSet = AttributeSet(output)
+
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(groupingAttributes) :: Nil
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(groupingAttributes.map(SortOrder(_, Ascending)))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+val inputRDD = child.execute()
+
+val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
+val reuseWorker = 
inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
+val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction)))
+val argOffsets = Array((0 until child.schema.length).toArray)
+
+inputRDD.mapPartitionsInternal { iter =>
+  val grouped = GroupedIterator(iter, groupingAttributes, child.output)
--- End diff --

I sent a pr to your repository to support these cases 
https://github.com/icexelloss/spark/pull/4.
Could you take a look at it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16618: [SPARK-14409][ML][WIP] Add RankingEvaluator

2017-10-04 Thread Kornel
Github user Kornel commented on the issue:

https://github.com/apache/spark/pull/16618
  
@MLnick I'm wondering what's the status of this issue: seems closed, have 
you any plans on picking it up again?

I might pick it up, but I'm not sure what's left: move from package mllib 
to ml and maybe a python API?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-04 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142620788
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
--- End diff --

This line is necessary in order to avoid serialization of `LDASuite` which 
is not serializable. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-04 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142622117
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
--- End diff --

Thanks. Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-04 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142624093
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
--- End diff --

Thanks. Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-04 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142624340
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+ v : (BDM[Double], Option[BDV[Double]], 
Long)) => {
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN : Long) = stats
--- End diff --

Thanks. Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-04 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142624246
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+ v : (BDM[Double], Option[BDV[Double]], 
Long)) => {
--- End diff --

Thanks. Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-04 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142624984
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+ v : (BDM[Double], Option[BDV[Double]], 
Long)) => {
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN : Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
+elementWiseSum, elementWiseSum
+  )
 
+val batchResult = statsSum *:* expElogbeta.t
 // Note that this is an optimization to avoid batch.count
-updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
-if (optimizeDocConcentration) updateAlpha(gammat)
+val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
--- End diff --

I believe, this will be settled down SPARK-22111.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-04 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142625490
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+ v : (BDM[Double], Option[BDV[Double]], 
Long)) => {
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN : Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
+elementWiseSum, elementWiseSum
+  )
 
+val batchResult = statsSum *:* expElogbeta.t
 // Note that this is an optimization to avoid batch.count
-updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
-if (optimizeDocConcentration) updateAlpha(gammat)
+val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
+updateLambda(batchResult, batchSize)
+
+logphatOption.foreach(_ /= batchSize.toDouble)
+logphatOption.foreach(updateAlpha(_, nonEmptyDocsN))
+
+expElogbetaBc.destroy(false)
+
 this
   }
 
   /**
-   * Update lambda based on the batch submitted. batchSize can be 
different for each iteration.
+   * Update lambda based on the batch submitted. nonEmptyDocsN can be 
different for each iteration.
--- End diff --

Thanks. Comment reverted. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19083
  
**[Test build #82445 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82445/testReport)**
 for PR 19083 at commit 
[`09ae105`](https://github.com/apache/spark/commit/09ae105c101a1b31d2a8873976c01590c50411d2).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19083
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82445/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19083
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19083
  
**[Test build #82446 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82446/testReport)**
 for PR 19083 at commit 
[`09ae105`](https://github.com/apache/spark/commit/09ae105c101a1b31d2a8873976c01590c50411d2).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19083
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82446/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19083
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-04 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142632240
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+ v : (BDM[Double], Option[BDV[Double]], 
Long)) => {
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN : Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
+elementWiseSum, elementWiseSum
+  )
 
+val batchResult = statsSum *:* expElogbeta.t
 // Note that this is an optimization to avoid batch.count
-updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
-if (optimizeDocConcentration) updateAlpha(gammat)
+val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
+updateLambda(batchResult, batchSize)
+
+logphatOption.foreach(_ /= batchSize.toDouble)
--- End diff --

Thanks for the good point. Do I understand correctly that if a batch 
without any non-empty docs is submitted, the `submitMiniBatch` method shouldn't 
change the state of `LDAOptimizer`? 
cc @WeichenXu123 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

2017-10-04 Thread akopich
Github user akopich commented on the issue:

https://github.com/apache/spark/pull/18924
  
@WeichenXu123. thank you


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

2017-10-04 Thread akopich
Github user akopich commented on the issue:

https://github.com/apache/spark/pull/18924
  
@hhbyyh, this change does not target performance but scalability, and I am 
afraid, the change is beneficial only for huge datasets and the tests would 
require massive computational resources. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19419: [SPARK-22188] [CORE] Adding security headers for ...

2017-10-04 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/19419#discussion_r142634158
  
--- Diff: conf/spark-defaults.conf.template ---
@@ -19,9 +19,16 @@
 # This is useful for setting default environmental settings.
 
 # Example:
-# spark.master spark://master:7077
-# spark.eventLog.enabled   true
-# spark.eventLog.dir   hdfs://namenode:8021/directory
-# spark.serializer 
org.apache.spark.serializer.KryoSerializer
-# spark.driver.memory  5g
-# spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value 
-Dnumbers="one two three"
+# spark.master  spark://master:7077
+# spark.eventLog.enabledtrue
+# spark.eventLog.dirhdfs://namenode:8021/directory
+# spark.serializer  
org.apache.spark.serializer.KryoSerializer
+# spark.driver.memory   5g
+# spark.executor.extraJavaOptions   -XX:+PrintGCDetails -Dkey=value 
-Dnumbers="one two three"
+
+#spark.ui.allowFramingFrom  https://example.com/
+#spark.ui.xXssProtection.enabled1; mode=block
--- End diff --

I would remove '.enabled' from the name then


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19419: [SPARK-22188] [CORE] Adding security headers for ...

2017-10-04 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/19419#discussion_r142634239
  
--- Diff: conf/spark-defaults.conf.template ---
@@ -19,9 +19,16 @@
 # This is useful for setting default environmental settings.
 
 # Example:
-# spark.master spark://master:7077
-# spark.eventLog.enabled   true
-# spark.eventLog.dir   hdfs://namenode:8021/directory
-# spark.serializer 
org.apache.spark.serializer.KryoSerializer
-# spark.driver.memory  5g
-# spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value 
-Dnumbers="one two three"
+# spark.master  spark://master:7077
+# spark.eventLog.enabledtrue
+# spark.eventLog.dirhdfs://namenode:8021/directory
+# spark.serializer  
org.apache.spark.serializer.KryoSerializer
+# spark.driver.memory   5g
+# spark.executor.extraJavaOptions   -XX:+PrintGCDetails -Dkey=value 
-Dnumbers="one two three"
+
+#spark.ui.allowFramingFrom  https://example.com/
+#spark.ui.xXssProtection.enabled1; mode=block
+#spark.ui.xContentType.options  nosniff
+
+#Enable below only when Spark is running on HTTPS
+#spark.ui.strictTransportSecurity.age   max-age=31536000
--- End diff --

I'd just remove '.age' then, likewise. It's the very value of the STS header


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

2017-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18924
  
**[Test build #82448 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82448/testReport)**
 for PR 18924 at commit 
[`5c9547f`](https://github.com/apache/spark/commit/5c9547f6bc85b68d50b3c58a4a0d17eb6c75dcaf).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

2017-10-04 Thread akopich
Github user akopich commented on the issue:

https://github.com/apache/spark/pull/18924
  
BTW. Seems like `updateLambda` method relies (in older version as well) on 
`batchSize` only because this is `an optimization to avoid batch.count`. 
Shouldn't we rather use `nonEmptyDocsN` instead since we compute it efficietly 
now? But that is going to change logic...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19424: [SPARK-22197][SQL] push down operators to data source be...

2017-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19424
  
**[Test build #82447 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82447/testReport)**
 for PR 19424 at commit 
[`75457f6`](https://github.com/apache/spark/commit/75457f608b9068ad1b3b3eb129f14d4e1b4ed946).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19427: Reset spark.driver.bindAddress when starting a Ch...

2017-10-04 Thread ssaavedra
GitHub user ssaavedra opened a pull request:

https://github.com/apache/spark/pull/19427

Reset spark.driver.bindAddress when starting a Checkpoint

## What changes were proposed in this pull request?

It seems that recovering from a checkpoint can replace the old
driver and executor IP addresses, as the workload can now be taking
place in a different cluster configuration. It follows that the
bindAddress for the master may also have changed. Thus we should not be
keeping the old one, and instead be added to the list of properties to
reset and recreate from the new environment.

## How was this patch tested?

This patch was tested via manual testing on AWS, using the experimental 
(not yet merged) Kubernetes scheduler, which uses bindAddress to bind to a 
Kubernetes service (and thus was how I first encountered the bug too), but it 
is not a code-path related to the scheduler and this may have slipped through 
when merging SPARK-4563.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ssaavedra/spark fix-checkpointing-master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19427.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19427


commit 892555f452173b73aeabc077749c4c32a7d4e504
Author: Santiago Saavedra 
Date:   2017-09-28T15:30:29Z

Reset spark.driver.bindAddress when starting a Checkpoint




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19424: [SPARK-22197][SQL] push down operators to data source be...

2017-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19424
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19424: [SPARK-22197][SQL] push down operators to data source be...

2017-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19424
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82447/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19427: Reset spark.driver.bindAddress when starting a Checkpoin...

2017-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19427
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

2017-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18924
  
**[Test build #82448 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82448/testReport)**
 for PR 18924 at commit 
[`5c9547f`](https://github.com/apache/spark/commit/5c9547f6bc85b68d50b3c58a4a0d17eb6c75dcaf).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

2017-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18924
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

2017-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18924
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82448/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19428: [SPARK-22131][MESOS] Mesos driver secrets

2017-10-04 Thread susanxhuynh
GitHub user susanxhuynh opened a pull request:

https://github.com/apache/spark/pull/19428

[SPARK-22131][MESOS] Mesos driver secrets

## What changes were proposed in this pull request?

The driver launches executors that have access to env or file-based secrets.

Most of the changes are a refactor of the `dispatcher` secrets support - 
placing it in a common place that can be used by both the dispatcher and 
drivers. The same goes for the unit tests.

## How was this patch tested?

Unit tests.
Tested in DC/OS.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mesosphere/spark sh-mesos-driver-secret

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19428.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19428


commit 4b7ae0a8e802f8ef6a159f3ce95b3203352b548f
Author: Susan X. Huynh 
Date:   2017-10-04T11:30:31Z

[SPARK-22131] Mesos driver secrets. The driver launches executors that have 
access to env or file-based secrets.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19090: [SPARK-21877][DEPLOY, WINDOWS] Handle quotes in Windows ...

2017-10-04 Thread minixalpha
Github user minixalpha commented on the issue:

https://github.com/apache/spark/pull/19090
  
@jsnowacki I have already add comments to explain the quotes, could you 
help me review the comments? Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19428: [SPARK-22131][MESOS] Mesos driver secrets

2017-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19428
  
**[Test build #82449 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82449/testReport)**
 for PR 19428 at commit 
[`dcbfc4a`](https://github.com/apache/spark/commit/dcbfc4a94b63130d1944dc423cad961f83e4a463).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19428: [SPARK-22131][MESOS] Mesos driver secrets

2017-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19428
  
**[Test build #82449 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82449/testReport)**
 for PR 19428 at commit 
[`dcbfc4a`](https://github.com/apache/spark/commit/dcbfc4a94b63130d1944dc423cad961f83e4a463).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19428: [SPARK-22131][MESOS] Mesos driver secrets

2017-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19428
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82449/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19428: [SPARK-22131][MESOS] Mesos driver secrets

2017-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19428
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19090: [SPARK-21877][DEPLOY, WINDOWS] Handle quotes in Windows ...

2017-10-04 Thread jsnowacki
Github user jsnowacki commented on the issue:

https://github.com/apache/spark/pull/19090
  
I think the comments are fine and sufficiently explain extra quotes 
existence.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19090: [SPARK-21877][DEPLOY, WINDOWS] Handle quotes in Windows ...

2017-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19090
  
Thanks for reviewing @jsnowacki, let me try to take a final look. I also 
checked what I could all but let me double check. Just want to be careful as 
it's the entry point.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19429: [SPARK-20055] [Docs] Added documentation for load...

2017-10-04 Thread jomach
GitHub user jomach opened a pull request:

https://github.com/apache/spark/pull/19429

[SPARK-20055] [Docs] Added documentation for loading csv files into 
DataFrames

 

## What changes were proposed in this pull request?

 Added documentation for loading csv files into Dataframes

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jomach/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19429.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19429


commit f5941bf196a36afe8715d713fcaaf3f1a136d9e8
Author: Jorge Machado 
Date:   2017-10-04T13:09:16Z

SPARK-20055 Documentation
 -Added documentation for loading csv files into Dataframes




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19429: [SPARK-20055] [Docs] Added documentation for loading csv...

2017-10-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19429
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142678914
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.TaskContext
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import 
org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution, Partitioning}
+import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+
+case class FlatMapGroupsInPandasExec(
+groupingAttributes: Seq[Attribute],
+func: Expression,
+output: Seq[Attribute],
+child: SparkPlan)
+  extends UnaryExecNode {
+
+  private val pandasFunction = func.asInstanceOf[PythonUDF].func
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def producedAttributes: AttributeSet = AttributeSet(output)
+
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(groupingAttributes) :: Nil
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(groupingAttributes.map(SortOrder(_, Ascending)))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+val inputRDD = child.execute()
+
+val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
+val reuseWorker = 
inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
+val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction)))
+val argOffsets = Array((0 until child.schema.length).toArray)
+
+inputRDD.mapPartitionsInternal { iter =>
+  val grouped = GroupedIterator(iter, groupingAttributes, child.output)
+  val context = TaskContext.get()
+
+  val columnarBatchIter = new ArrowPythonRunner(
+chainedFunc, bufferSize, reuseWorker,
+PythonEvalType.SQL_PANDAS_UDF, argOffsets, child.schema)
+.compute(grouped.map(_._2), context.partitionId(), context)
+
+  val rowIter = new Iterator[InternalRow] {
+private var currentIter = if (columnarBatchIter.hasNext) {
+  val batch = columnarBatchIter.next()
+  batch.rowIterator.asScala
--- End diff --

The returned schema is checked on the python side. It will throw exception 
when serializer tries to coerce series types.

Here is the test that covers wrong return types:

https://github.com/icexelloss/spark/blob/groupby-apply-SPARK-20396/python/pyspark/sql/tests.py#L3458
 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142689702
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3106,8 +3106,9 @@ def assertFramesEqual(self, df_with_arrow, 
df_without):
 self.assertTrue(df_without.equals(df_with_arrow), msg=msg)
 
 def test_unsupported_datatype(self):
-schema = StructType([StructField("dt", DateType(), True)])
-df = self.spark.createDataFrame([(datetime.date(1970, 1, 1),)], 
schema=schema)
--- End diff --

I think it's a white space thing. Let me revert this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142690650
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,66 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self)
+
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-function should take a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
+Each group is passed as a `pandas.DataFrame` to the user-function 
and the returned
+`pandas.DataFrame` are combined as a :class:`DataFrame`. The 
returned `pandas.DataFrame`
+can be arbitrary length and its schema should match the returnType 
of the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP
--- End diff --

Thanks! Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142690602
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,66 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self)
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142691179
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3106,8 +3106,9 @@ def assertFramesEqual(self, df_with_arrow, 
df_without):
 self.assertTrue(df_without.equals(df_with_arrow), msg=msg)
 
 def test_unsupported_datatype(self):
-schema = StructType([StructField("dt", DateType(), True)])
-df = self.spark.createDataFrame([(datetime.date(1970, 1, 1),)], 
schema=schema)
--- End diff --

Oh actually DataType() -> TimestampType(), let me double check.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142692448
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3106,8 +3106,9 @@ def assertFramesEqual(self, df_with_arrow, 
df_without):
 self.assertTrue(df_without.equals(df_with_arrow), msg=msg)
 
 def test_unsupported_datatype(self):
-schema = StructType([StructField("dt", DateType(), True)])
-df = self.spark.createDataFrame([(datetime.date(1970, 1, 1),)], 
schema=schema)
--- End diff --

Reverted.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142693843
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+arrow_return_types = [to_arrow_type(field.dataType) for field in 
return_type]
+
+def fn(*a):
+import pandas as pd
+out = f(*a)
--- End diff --

Yes that is more consistent. Let me change that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142693686
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+arrow_return_types = [to_arrow_type(field.dataType) for field in 
return_type]
+
+def fn(*a):
--- End diff --

`verify_result_type` is kind of a misnomer because this function does:

1. convert the output of the user-defined function (pandas.DataFrame) to 
the form that the serialzer take (list of (pd.Series, DataType)) 

2. Validate the return value of the user-defined function.

Part of the verifying of the result type is done in the serializer in the 
process of coercing. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142694381
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+arrow_return_types = [to_arrow_type(field.dataType) for field in 
return_type]
+
+def fn(*a):
+import pandas as pd
+out = f(*a)
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142694484
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+arrow_return_types = [to_arrow_type(field.dataType) for field in 
return_type]
+
+def fn(*a):
+import pandas as pd
+out = f(*a)
+assert isinstance(out, pd.DataFrame), \
+'Return value from the user function is not a 
pandas.DataFrame.'
--- End diff --

Good catch. Yeah let's keep such terms consistent. Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142694835
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+arrow_return_types = [to_arrow_type(field.dataType) for field in 
return_type]
+
+def fn(*a):
+import pandas as pd
+out = f(*a)
+assert isinstance(out, pd.DataFrame), \
+'Return value from the user function is not a 
pandas.DataFrame.'
+assert len(out.columns) == len(arrow_return_types), \
+'Number of columns of the returned pd.DataFrame doesn\'t 
match output schema. ' \
--- End diff --

Good catch. Fixed. (Btw thanks for catching these small things)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142695129
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -519,3 +519,18 @@ case class CoGroup(
 outputObjAttr: Attribute,
 left: LogicalPlan,
 right: LogicalPlan) extends BinaryNode with ObjectProducer
+
+case class FlatMapGroupsInPandas(
--- End diff --

Yes agreed. I will fix that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142695501
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -26,6 +26,25 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types.StructType
 
+private class BatchIterator[T](iter: Iterator[T], batchSize: Int)
+  extends Iterator[Iterator[T]] {
+
+  override def hasNext: Boolean = iter.hasNext
+
+  override def next(): Iterator[T] = {
--- End diff --

+1. Let me add that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142695929
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.TaskContext
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, 
AttributeSet, Expression, NamedExpression, SortOrder, UnsafeProjection}
+import 
org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution, Partitioning}
+import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+
+case class FlatMapGroupsInPandasExec(
+grouping: Seq[Expression],
+func: Expression,
+override val output: Seq[Attribute],
+override val child: SparkPlan
+) extends UnaryExecNode {
+
+  val groupingAttributes: Seq[Attribute] = grouping.map {
+case ne: NamedExpression => ne.toAttribute
+  }
+
+  private val pandasFunction = func.asInstanceOf[PythonUDF].func
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def producedAttributes: AttributeSet = AttributeSet(output)
+
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(groupingAttributes) :: Nil
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(groupingAttributes.map(SortOrder(_, Ascending)))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+val inputRDD = child.execute()
+
+val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
+val reuseWorker = 
inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
+val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction)))
+val argOffsets = Array((0 until child.schema.length).toArray)
+
+inputRDD.mapPartitionsInternal { iter =>
+  val grouped = GroupedIterator(iter, groupingAttributes, child.output)
--- End diff --

Yes thanks much! I will take a look now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142695843
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -111,6 +111,9 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with 
PredicateHelper {
   }
 
   def apply(plan: SparkPlan): SparkPlan = plan transformUp {
+// FlatMapGroupsInPandas and be evaluated in python worker
--- End diff --

Good catch. Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142697418
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
--- End diff --

normal pandas doesn't support `StructType` as returnType that's why this 
works.

However, I agree the way we distinguish grouping udf and normal udf is not 
clean. Ideally we should have a cleaner way of defining such wrapping functions 
for different pandas_udf use cases. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19419: [SPARK-22188] [CORE] Adding security headers for ...

2017-10-04 Thread krishna-pandey
Github user krishna-pandey commented on a diff in the pull request:

https://github.com/apache/spark/pull/19419#discussion_r142701588
  
--- Diff: conf/spark-defaults.conf.template ---
@@ -19,9 +19,16 @@
 # This is useful for setting default environmental settings.
 
 # Example:
-# spark.master spark://master:7077
-# spark.eventLog.enabled   true
-# spark.eventLog.dir   hdfs://namenode:8021/directory
-# spark.serializer 
org.apache.spark.serializer.KryoSerializer
-# spark.driver.memory  5g
-# spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value 
-Dnumbers="one two three"
+# spark.master  spark://master:7077
+# spark.eventLog.enabledtrue
+# spark.eventLog.dirhdfs://namenode:8021/directory
+# spark.serializer  
org.apache.spark.serializer.KryoSerializer
+# spark.driver.memory   5g
+# spark.executor.extraJavaOptions   -XX:+PrintGCDetails -Dkey=value 
-Dnumbers="one two three"
+
+#spark.ui.allowFramingFrom  https://example.com/
+#spark.ui.xXssProtection.enabled1; mode=block
+#spark.ui.xContentType.options  nosniff
+
+#Enable below only when Spark is running on HTTPS
+#spark.ui.strictTransportSecurity.age   max-age=31536000
--- End diff --

I understand that the "max-age" part is common across all values but don't 
want to tamper the value part for ease and any future compatibility. I will 
rather remove ".age" to avoid confusion arising out of this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142703487
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -435,6 +435,33 @@ class RelationalGroupedDataset protected[sql](
   df.logicalPlan.output,
   df.logicalPlan))
   }
+
+  private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = {
+require(expr.vectorized, "Must pass a vectorized python udf")
+
+val output = expr.dataType match {
+  case s: StructType => s.map {
+case StructField(name, dataType, nullable, metadata) =>
+  AttributeReference(name, dataType, nullable, metadata)()
+  }
+}
+
+val groupingAttributes: Seq[Attribute] = groupingExprs.map {
+  case ne: NamedExpression => ne.toAttribute
+}
+
+val plan = FlatMapGroupsInPandas(
+  groupingAttributes,
+  expr,
+  output,
+  df.logicalPlan
+)
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142703829
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -435,6 +435,33 @@ class RelationalGroupedDataset protected[sql](
   df.logicalPlan.output,
   df.logicalPlan))
   }
+
+  private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = {
+require(expr.vectorized, "Must pass a vectorized python udf")
+
+val output = expr.dataType match {
+  case s: StructType => s.map {
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142704126
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -435,6 +435,33 @@ class RelationalGroupedDataset protected[sql](
   df.logicalPlan.output,
   df.logicalPlan))
   }
+
+  private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = {
+require(expr.vectorized, "Must pass a vectorized python udf")
+
+val output = expr.dataType match {
+  case s: StructType => s.map {
+case StructField(name, dataType, nullable, metadata) =>
+  AttributeReference(name, dataType, nullable, metadata)()
+  }
+}
+
+val groupingAttributes: Seq[Attribute] = groupingExprs.map {
+  case ne: NamedExpression => ne.toAttribute
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142704642
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,66 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self)
+
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-function should take a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19041: [SPARK-21097][CORE] Add option to recover cached data

2017-10-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19041
  
**[Test build #82450 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82450/testReport)**
 for PR 19041 at commit 
[`a3af4bd`](https://github.com/apache/spark/commit/a3af4bd17a14e1f3f6fe543a44e977841fd72a93).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19419: [SPARK-22188] [CORE] Adding security headers for ...

2017-10-04 Thread krishna-pandey
Github user krishna-pandey commented on a diff in the pull request:

https://github.com/apache/spark/pull/19419#discussion_r142708896
  
--- Diff: conf/spark-defaults.conf.template ---
@@ -19,9 +19,16 @@
 # This is useful for setting default environmental settings.
 
 # Example:
-# spark.master spark://master:7077
-# spark.eventLog.enabled   true
-# spark.eventLog.dir   hdfs://namenode:8021/directory
-# spark.serializer 
org.apache.spark.serializer.KryoSerializer
-# spark.driver.memory  5g
-# spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value 
-Dnumbers="one two three"
+# spark.master  spark://master:7077
+# spark.eventLog.enabledtrue
+# spark.eventLog.dirhdfs://namenode:8021/directory
+# spark.serializer  
org.apache.spark.serializer.KryoSerializer
+# spark.driver.memory   5g
+# spark.executor.extraJavaOptions   -XX:+PrintGCDetails -Dkey=value 
-Dnumbers="one two three"
+
+#spark.ui.allowFramingFrom  https://example.com/
+#spark.ui.xXssProtection.enabled1; mode=block
--- End diff --

@srowen renamed the keys as suggested. Thanks again for the review.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13893: [SPARK-14172][SQL] Hive table partition predicate not pa...

2017-10-04 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/13893
  
@heary-cao tried to resolve the same issue in 
https://github.com/apache/spark/pull/18969 

ping @jiangxb1987 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   >