[GitHub] spark issue #19424: [SPARK-22197][SQL] push down operators to data source be...

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19424
  
**[Test build #82444 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82444/testReport)**
 for PR 19424 at commit 
[`75457f6`](https://github.com/apache/spark/commit/75457f608b9068ad1b3b3eb129f14d4e1b4ed946).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142592915
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.TaskContext
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import 
org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution, Partitioning}
+import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+
+case class FlatMapGroupsInPandasExec(
+groupingAttributes: Seq[Attribute],
+func: Expression,
+output: Seq[Attribute],
+child: SparkPlan)
+  extends UnaryExecNode {
+
+  private val pandasFunction = func.asInstanceOf[PythonUDF].func
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def producedAttributes: AttributeSet = AttributeSet(output)
+
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(groupingAttributes) :: Nil
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(groupingAttributes.map(SortOrder(_, Ascending)))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+val inputRDD = child.execute()
+
+val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
+val reuseWorker = 
inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
+val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction)))
+val argOffsets = Array((0 until child.schema.length).toArray)
+
+inputRDD.mapPartitionsInternal { iter =>
+  val grouped = GroupedIterator(iter, groupingAttributes, child.output)
+  val context = TaskContext.get()
+
+  val columnarBatchIter = new ArrowPythonRunner(
+chainedFunc, bufferSize, reuseWorker,
+PythonEvalType.SQL_PANDAS_UDF, argOffsets, child.schema)
+.compute(grouped.map(_._2), context.partitionId(), context)
+
+  val rowIter = new Iterator[InternalRow] {
+private var currentIter = if (columnarBatchIter.hasNext) {
+  val batch = columnarBatchIter.next()
+  batch.rowIterator.asScala
--- End diff --

Don't we need to check the returned schema?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19424: [SPARK-22197][SQL] push down operators to data so...

2017-10-03 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/19424

[SPARK-22197][SQL] push down operators to data source before planning

## What changes were proposed in this pull request?

As we discussed in 
https://github.com/apache/spark/pull/19136#discussion_r137023744 , we should 
push down operators to data source before planning, so that data source can 
report statistics more accurate.

## How was this patch tested?

existing tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark follow

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19424.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19424


commit 571fad95c2dac6f70840da9c68cfb1be1358ecaf
Author: Wenchen Fan 
Date:   2017-09-27T12:50:15Z

improve documents and minor clean up

commit 75457f608b9068ad1b3b3eb129f14d4e1b4ed946
Author: Wenchen Fan 
Date:   2017-10-04T01:37:25Z

push down operators to data source before planning




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19424: [SPARK-22197][SQL] push down operators to data source be...

2017-10-03 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19424
  
cc @rdblue @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17455: [Spark-20044][Web UI] Support Spark UI behind front-end ...

2017-10-03 Thread okoethibm
Github user okoethibm commented on the issue:

https://github.com/apache/spark/pull/17455
  
@jiangxb1987 I've stopped working on Spark related efforts a while ago and 
will not be following up on this PR any more. Anyone interested is free to take 
over.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19416: [SPARK-22187][SS] Update unsaferow format for sav...

2017-10-03 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19416#discussion_r142583033
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/FlatMapGroupsWithState_StateManager.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, BoundReference, CaseWhen, CreateNamedStruct, 
GetStructField, IsNull, Literal, UnsafeRow}
+import org.apache.spark.sql.execution.ObjectOperator
+import org.apache.spark.sql.execution.streaming.GroupStateImpl
+import org.apache.spark.sql.execution.streaming.GroupStateImpl.NO_TIMESTAMP
+import org.apache.spark.sql.types.{IntegerType, LongType, StructType}
+
+
+class FlatMapGroupsWithState_StateManager(
+stateEncoder: ExpressionEncoder[Any],
+shouldStoreTimestamp: Boolean) extends Serializable {
+
+  val stateSchema = {
+val schema = new StructType().add("groupState", stateEncoder.schema, 
nullable = true)
+if (shouldStoreTimestamp) schema.add("timeoutTimestamp", LongType) 
else schema
+  }
+
+  def getState(store: StateStore, keyRow: UnsafeRow): 
FlatMapGroupsWithState_StateData = {
+val stateRow = store.get(keyRow)
+stateDataForGets.withNew(
+  keyRow, stateRow, getStateObj(stateRow), getTimestamp(stateRow))
+  }
+
+  def putState(store: StateStore, keyRow: UnsafeRow, state: Any, 
timestamp: Long): Unit = {
+val stateRow = getStateRow(state)
+setTimestamp(stateRow, timestamp)
+store.put(keyRow, stateRow)
+  }
+
+  def removeState(store: StateStore, keyRow: UnsafeRow): Unit = {
+store.remove(keyRow)
+  }
+
+  def getAllState(store: StateStore): 
Iterator[FlatMapGroupsWithState_StateData] = {
+val stateDataForGetAllState = FlatMapGroupsWithState_StateData()
+store.getRange(None, None).map { pair =>
+  stateDataForGetAllState.withNew(
+pair.key, pair.value, getStateObjFromRow(pair.value), 
getTimestamp(pair.value))
+}
+  }
+
+  private val stateAttributes: Seq[Attribute] = stateSchema.toAttributes
+
+  // Get the serializer for the state, taking into account whether we need 
to save timestamps
+  private val stateSerializer = {
+val nestedStateExpr = CreateNamedStruct(
+  stateEncoder.namedExpressions.flatMap(e => Seq(Literal(e.name), e)))
+if (shouldStoreTimestamp) {
+  Seq(nestedStateExpr, Literal(GroupStateImpl.NO_TIMESTAMP))
+} else {
+  Seq(nestedStateExpr)
+}
+  }
+
+  // Get the deserializer for the state. Note that this must be done in 
the driver, as
+  // resolving and binding of deserializer expressions to the encoded type 
can be safely done
+  // only in the driver.
+  private val stateDeserializer = {
+val boundRefToNestedState = BoundReference(nestedStateOrdinal, 
stateEncoder.schema, true)
+val deser = stateEncoder.resolveAndBind().deserializer.transformUp {
+  case BoundReference(ordinal, _, _) => 
GetStructField(boundRefToNestedState, ordinal)
+}
+CaseWhen(Seq(IsNull(boundRefToNestedState) -> Literal(null)), 
elseValue = deser).toCodegen()
+  }
+
+  private lazy val nestedStateOrdinal = 0
+  private lazy val timeoutTimestampOrdinal = 1
+
+  // Converters for translating state between rows and Java objects
+  private lazy val getStateObjFromRow = 
ObjectOperator.deserializeRowToObject(
+stateDeserializer, stateAttributes)
+  private lazy val getStateRowFromObj = 
ObjectOperator.serializeObjectToRow(stateSerializer)
+
+  private lazy val stateDataForGets = FlatMapGroupsWithState_StateData()
+
+  /** Returns the state as Java object if defined */
+  private def getStateObj(stateRow: UnsafeRow): Any = {
+i

[GitHub] spark pull request #19422: [SPARK-22193][SQL] Minor typo fix

2017-10-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19422


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19422: [SPARK-22193][SQL] Minor typo fix

2017-10-03 Thread srowen
Github user srowen commented on the issue:

https://github.com/apache/spark/pull/19422
  
Merged to master


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142583949
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,66 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self)
+
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-function should take a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
+Each group is passed as a `pandas.DataFrame` to the user-function 
and the returned
+`pandas.DataFrame` are combined as a :class:`DataFrame`. The 
returned `pandas.DataFrame`
+can be arbitrary length and its schema should match the returnType 
of the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP
--- End diff --

And two spaces before the inlined comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142583906
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -44,14 +63,17 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chi
 val schemaOut = 
StructType.fromAttributes(output.drop(child.output.length).zipWithIndex
   .map { case (attr, i) => attr.withName(s"_$i") })
 
+val batchSize = conf.arrowMaxRecordsPerBatch
+val batchIter = if (batchSize > 0) new BatchIterator(iter, batchSize) 
else Iterator(iter)
--- End diff --

Ah, I see. Then please see my comment at 
https://github.com/apache/spark/pull/18732#discussion_r142578554.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19404: [SPARK-21760] [Streaming] Fix for Structured streaming t...

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19404
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19404: [SPARK-21760] [Streaming] Fix for Structured streaming t...

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19404
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82439/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19404: [SPARK-21760] [Streaming] Fix for Structured streaming t...

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19404
  
**[Test build #82439 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82439/testReport)**
 for PR 19404 at commit 
[`a2d5bc7`](https://github.com/apache/spark/commit/a2d5bc706987accaeb6c9516e7d4a07b5bb3104f).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142583590
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,66 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self)
--- End diff --

A good catch. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142583338
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -44,14 +63,17 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chi
 val schemaOut = 
StructType.fromAttributes(output.drop(child.output.length).zipWithIndex
   .map { case (attr, i) => attr.withName(s"_$i") })
 
+val batchSize = conf.arrowMaxRecordsPerBatch
+val batchIter = if (batchSize > 0) new BatchIterator(iter, batchSize) 
else Iterator(iter)
--- End diff --

This is actually my first implementation. However it turns out I cannot 
hold reference to input rows without copy so `grouped` doesn't work (grouped 
uses a ArrayBuffer to keep references)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19083
  
**[Test build #82443 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82443/testReport)**
 for PR 19083 at commit 
[`09ae105`](https://github.com/apache/spark/commit/09ae105c101a1b31d2a8873976c01590c50411d2).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18732
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82440/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18732
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18732
  
**[Test build #82440 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82440/testReport)**
 for PR 18732 at commit 
[`21fed0d`](https://github.com/apache/spark/commit/21fed0dfeefe5775d193d7bb4176c38f0c2b91eb).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19083: [SPARK-21871][SQL] Check actual bytecode size whe...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19083#discussion_r142582571
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -380,23 +380,26 @@ case class WholeStageCodegenExec(child: SparkPlan) 
extends UnaryExecNode with Co
 
   override def doExecute(): RDD[InternalRow] = {
 val (ctx, cleanedSource) = doCodeGen()
-if (ctx.isTooLongGeneratedFunction) {
-  logWarning("Found too long generated codes and JIT optimization 
might not work, " +
-"Whole-stage codegen disabled for this plan, " +
-"You can change the config spark.sql.codegen.MaxFunctionLength " +
-"to adjust the function length limit:\n "
-+ s"$treeString")
-  return child.execute()
-}
 // try to compile and fallback if it failed
-try {
+val (_, maxCodeSize) = try {
   CodeGenerator.compile(cleanedSource)
 } catch {
   case _: Exception if !Utils.isTesting && 
sqlContext.conf.codegenFallback =>
 // We should already saw the error message
 logWarning(s"Whole-stage codegen disabled for this plan:\n 
$treeString")
 return child.execute()
 }
+
+// Check if compiled code has a too large function
+if (maxCodeSize > sqlContext.conf.hugeMethodLimit) {
+  logWarning(s"Found too long generated codes and JIT optimization 
might not work: " +
+s"the bytecode size was $maxCodeSize, this value went over the 
limit " +
+s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen 
was disable " +
+s"for this plan. To avoid this, you can set the limit " +
--- End diff --

`set ` -> `raise `. then, remove `higher`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19083: [SPARK-21871][SQL] Check actual bytecode size whe...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19083#discussion_r142582458
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -380,23 +380,26 @@ case class WholeStageCodegenExec(child: SparkPlan) 
extends UnaryExecNode with Co
 
   override def doExecute(): RDD[InternalRow] = {
 val (ctx, cleanedSource) = doCodeGen()
-if (ctx.isTooLongGeneratedFunction) {
-  logWarning("Found too long generated codes and JIT optimization 
might not work, " +
-"Whole-stage codegen disabled for this plan, " +
-"You can change the config spark.sql.codegen.MaxFunctionLength " +
-"to adjust the function length limit:\n "
-+ s"$treeString")
-  return child.execute()
-}
 // try to compile and fallback if it failed
-try {
+val (_, maxCodeSize) = try {
   CodeGenerator.compile(cleanedSource)
 } catch {
   case _: Exception if !Utils.isTesting && 
sqlContext.conf.codegenFallback =>
 // We should already saw the error message
 logWarning(s"Whole-stage codegen disabled for this plan:\n 
$treeString")
 return child.execute()
 }
+
+// Check if compiled code has a too large function
+if (maxCodeSize > sqlContext.conf.hugeMethodLimit) {
+  logWarning(s"Found too long generated codes and JIT optimization 
might not work: " +
+s"the bytecode size was $maxCodeSize, this value went over the 
limit " +
+s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen 
was disable " +
--- End diff --

`disable ` -> `disabled `


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142581225
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,66 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self)
+
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-function should take a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
+Each group is passed as a `pandas.DataFrame` to the user-function 
and the returned
+`pandas.DataFrame` are combined as a :class:`DataFrame`. The 
returned `pandas.DataFrame`
+can be arbitrary length and its schema should match the returnType 
of the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP
--- End diff --

`# doctest: +SKIP`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18732
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82438/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18732
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18732
  
**[Test build #82438 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82438/testReport)**
 for PR 18732 at commit 
[`4943ceb`](https://github.com/apache/spark/commit/4943ceb8b57041a7bf911e8f8637380c5fc263ff).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [SPARK-22136][SS] Implement stream-stream outer j...

2017-10-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19327


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18931: [SPARK-21717][SQL] Decouple consume functions of physica...

2017-10-03 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/18931
  
ping @gatorsmile @cloud-fan for review. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...

2017-10-03 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/17819
  
@gatorsmile The SQL change looks good to you? Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19327: [SPARK-22136][SS] Implement stream-stream outer joins.

2017-10-03 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/19327
  
LGTM. Merging to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142579512
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -519,3 +519,18 @@ case class CoGroup(
 outputObjAttr: Attribute,
 left: LogicalPlan,
 right: LogicalPlan) extends BinaryNode with ObjectProducer
+
+case class FlatMapGroupsInPandas(
+groupingAttributes: Seq[Attribute],
+functionExpr: Expression,
+output: Seq[Attribute],
+child: LogicalPlan) extends UnaryNode {
+  /**
+   * This is needed because output attributes is considered `reference` 
when
+   * passed through the constructor.
+   *
+   * Without this, catalyst will complain that output attributes are 
missing
+   * from the input.
+   */
+  override val producedAttributes = AttributeSet(output)
--- End diff --

Not quite sure I understand this correctly. Why `output` is considered into 
`references`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19083
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19083
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82437/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19083
  
**[Test build #82437 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82437/testReport)**
 for PR 19083 at commit 
[`fca22b7`](https://github.com/apache/spark/commit/fca22b767fddb061303cddd4e06c87130b1b32dc).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19395: [SPARK-22171] [SQL] Describe Table Extended Failed when ...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19395
  
Thanks! Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19395: [SPARK-22171] [SQL] Describe Table Extended Faile...

2017-10-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19395


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19083
  
**[Test build #82442 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82442/testReport)**
 for PR 19083 at commit 
[`433f13b`](https://github.com/apache/spark/commit/433f13b03e995bbb47641b44ed1f7961cc4ea2ec).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19419: [SPARK-22188] [CORE] Adding security headers for ...

2017-10-03 Thread krishna-pandey
Github user krishna-pandey commented on a diff in the pull request:

https://github.com/apache/spark/pull/19419#discussion_r142578623
  
--- Diff: core/src/main/scala/org/apache/spark/ui/JettyUtils.scala ---
@@ -89,6 +92,9 @@ private[spark] object JettyUtils extends Logging {
 val result = servletParams.responder(request)
 response.setHeader("Cache-Control", "no-cache, no-store, 
must-revalidate")
 response.setHeader("X-Frame-Options", xFrameOptionsValue)
+response.setHeader("X-XSS-Protection", xXssProtectionValue.get)
--- End diff --

@srowen Added the check for if Option exists then set and tested locally. 
Thanks for the review.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19419: [SPARK-22188] [CORE] Adding security headers for ...

2017-10-03 Thread krishna-pandey
Github user krishna-pandey commented on a diff in the pull request:

https://github.com/apache/spark/pull/19419#discussion_r142578622
  
--- Diff: core/src/main/scala/org/apache/spark/ui/JettyUtils.scala ---
@@ -89,6 +92,9 @@ private[spark] object JettyUtils extends Logging {
 val result = servletParams.responder(request)
 response.setHeader("Cache-Control", "no-cache, no-store, 
must-revalidate")
 response.setHeader("X-Frame-Options", xFrameOptionsValue)
+response.setHeader("X-XSS-Protection", xXssProtectionValue.get)
--- End diff --

@srowen Added the check for if Option exists then set and tested locally. 
Thanks for the review.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142578625
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,66 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self)
--- End diff --

`return GroupedData(jgd, self._df)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142578554
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -26,6 +26,25 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types.StructType
 
+private class BatchIterator[T](iter: Iterator[T], batchSize: Int)
+  extends Iterator[Iterator[T]] {
+
+  override def hasNext: Boolean = iter.hasNext
+
+  override def next(): Iterator[T] = {
--- End diff --

If we really don't want to use `GroupedIterator` as suggested in 
https://github.com/apache/spark/pull/18732/files#r142577976, we should add a 
clear warning comment on it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19423: Branch 2.2

2017-10-03 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19423
  
@engineeyao, close this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142578363
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -111,6 +111,9 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with 
PredicateHelper {
   }
 
   def apply(plan: SparkPlan): SparkPlan = plan transformUp {
+// FlatMapGroupsInPandas and be evaluated in python worker
--- End diff --

The comment seems to be wrong wording.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15666: [SPARK-11421] [Core][Python][R] Added ability for addJar...

2017-10-03 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/15666
  
Thanks for asking this. I completely forgot this one. Will try to make some 
time to take a look within few days.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142577976
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -44,14 +63,17 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chi
 val schemaOut = 
StructType.fromAttributes(output.drop(child.output.length).zipWithIndex
   .map { case (attr, i) => attr.withName(s"_$i") })
 
+val batchSize = conf.arrowMaxRecordsPerBatch
+val batchIter = if (batchSize > 0) new BatchIterator(iter, batchSize) 
else Iterator(iter)
--- End diff --

Should we simply use `iter.grouped(batchSize).map(_.toIterator)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142577791
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -26,6 +26,25 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types.StructType
 
+private class BatchIterator[T](iter: Iterator[T], batchSize: Int)
+  extends Iterator[Iterator[T]] {
+
+  override def hasNext: Boolean = iter.hasNext
+
+  override def next(): Iterator[T] = {
--- End diff --

The behavior of this `BatchIterator` seems to be a bit weird and seems to 
not follow `Iterator`. For example, you can continue call its `next()` and 
never consume it to the end.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19083: [SPARK-21871][SQL] Check actual bytecode size whe...

2017-10-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19083#discussion_r142577265
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 ---
@@ -151,7 +151,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with 
SharedSQLContext {
 }
   }
 
-  def genGroupByCodeGenContext(caseNum: Int): CodegenContext = {
+  def genGroupByCodeGenContext(caseNum: Int): CodeAndComment = {
--- End diff --

`genGroupByCodeGenContext` -> `genGroupByCode`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19083: [SPARK-21871][SQL] Check actual bytecode size whe...

2017-10-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19083#discussion_r142577237
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 ---
@@ -17,10 +17,10 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.sql.{Column, Dataset, Row}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions.{Add, Literal, Stack}
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
+import java.util.concurrent.ExecutionException
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, 
CodegenContext, CodeGenerator}
--- End diff --

We don't use `CodegenContext` anymore and can remove it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19083: [SPARK-21871][SQL] Check actual bytecode size whe...

2017-10-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19083#discussion_r142577111
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
 ---
@@ -333,33 +334,28 @@ class AggregateBenchmark extends BenchmarkBase {
   .sum()
   .collect()
 
-benchmark.addCase(s"codegen = F") { iter =>
-  sparkSession.conf.set("spark.sql.codegen.wholeStage", "false")
+benchmark.addCase(s"hugeMethodLimit = 8000") { iter =>
+  sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
+  sparkSession.conf.set(SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key, 
"8000")
   f()
 }
 
-benchmark.addCase(s"codegen = T maxLinesPerFunction = 1") { iter =>
-  sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
-  sparkSession.conf.set("spark.sql.codegen.maxLinesPerFunction", 
"1")
-  f()
-}
-
-benchmark.addCase(s"codegen = T maxLinesPerFunction = 1500") { iter =>
-  sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
-  sparkSession.conf.set("spark.sql.codegen.maxLinesPerFunction", 
"1500")
+benchmark.addCase(s"hugeMethodLimit = 16000") { iter =>
+  sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
+  sparkSession.conf.set(SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key, 
"16000")
   f()
 }
 
 benchmark.run()
 
 /*
-Java HotSpot(TM) 64-Bit Server VM 1.8.0_111-b14 on Windows 7 6.1
-Intel64 Family 6 Model 58 Stepping 9, GenuineIntel
-max function length of wholestagecodegen: Best/Avg Time(ms)
Rate(M/s)  Per Row(ns)  Relative
-
--
-codegen = F462 /  533  1.4 
  704.4 1.0X
-codegen = T maxLinesPerFunction = 1   3444 / 3447  0.2 
 5255.3 0.1X
-codegen = T maxLinesPerFunction = 1500 447 /  478  1.5 
  682.1 1.0X
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_31-b13 on Mac OS X 10.10.2
+Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+
+max function bytecode size:  Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
+

+hugeMethodLimit = 80001043 / 1159  0.6 
   1591.5   1.0X
--- End diff --

yea, you're right; I'll update and thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19083: [SPARK-21871][SQL] Check actual bytecode size whe...

2017-10-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19083#discussion_r142576983
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
 ---
@@ -333,33 +334,28 @@ class AggregateBenchmark extends BenchmarkBase {
   .sum()
   .collect()
 
-benchmark.addCase(s"codegen = F") { iter =>
-  sparkSession.conf.set("spark.sql.codegen.wholeStage", "false")
+benchmark.addCase(s"hugeMethodLimit = 8000") { iter =>
+  sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
+  sparkSession.conf.set(SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key, 
"8000")
   f()
 }
 
-benchmark.addCase(s"codegen = T maxLinesPerFunction = 1") { iter =>
-  sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
-  sparkSession.conf.set("spark.sql.codegen.maxLinesPerFunction", 
"1")
-  f()
-}
-
-benchmark.addCase(s"codegen = T maxLinesPerFunction = 1500") { iter =>
-  sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
-  sparkSession.conf.set("spark.sql.codegen.maxLinesPerFunction", 
"1500")
+benchmark.addCase(s"hugeMethodLimit = 16000") { iter =>
+  sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
+  sparkSession.conf.set(SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key, 
"16000")
   f()
 }
 
 benchmark.run()
 
 /*
-Java HotSpot(TM) 64-Bit Server VM 1.8.0_111-b14 on Windows 7 6.1
-Intel64 Family 6 Model 58 Stepping 9, GenuineIntel
-max function length of wholestagecodegen: Best/Avg Time(ms)
Rate(M/s)  Per Row(ns)  Relative
-
--
-codegen = F462 /  533  1.4 
  704.4 1.0X
-codegen = T maxLinesPerFunction = 1   3444 / 3447  0.2 
 5255.3 0.1X
-codegen = T maxLinesPerFunction = 1500 447 /  478  1.5 
  682.1 1.0X
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_31-b13 on Mac OS X 10.10.2
+Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+
+max function bytecode size:  Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
+

+hugeMethodLimit = 80001043 / 1159  0.6 
   1591.5   1.0X
--- End diff --

The original `codegen = F` case is removed? I think it is reasonable to 
compare with it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...

2017-10-03 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19061#discussion_r142576602
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -203,6 +203,10 @@ package object config {
   private[spark] val HISTORY_UI_MAX_APPS =
 
ConfigBuilder("spark.history.ui.maxApplications").intConf.createWithDefault(Integer.MAX_VALUE)
 
+  private[spark] val UI_SHOW_CONSOLE_PROGRESS = 
ConfigBuilder("spark.ui.showConsoleProgress")
--- End diff --

While updating the PR, I noticed that `internal/config/package.scala` looks 
more natural for this.

Like 
[spark.ui.retainedTasks](https://github.com/apache/spark/pull/19061/files#diff-6bdad48cfc34314e89599655442ff210R198),
 I added here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-03 Thread maropu
Github user maropu commented on the issue:

https://github.com/apache/spark/pull/19083
  
fixed @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19061: [SPARK-21568][CORE] ConsoleProgressBar should only be en...

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19061
  
**[Test build #82441 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82441/testReport)**
 for PR 19061 at commit 
[`e2158ad`](https://github.com/apache/spark/commit/e2158ad13a5ff185bbe9dd26e4c00bc75db9dcfb).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-03 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142571627
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
--- End diff --

indent


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-03 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142572013
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+ v : (BDM[Double], Option[BDV[Double]], 
Long)) => {
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN : Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
+elementWiseSum, elementWiseSum
+  )
 
+val batchResult = statsSum *:* expElogbeta.t
 // Note that this is an optimization to avoid batch.count
-updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
-if (optimizeDocConcentration) updateAlpha(gammat)
+val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
--- End diff --

Instead of batchSize, I think we may use nonEmptyDocsN directly. @jkbradley 
please double check since it will be a behavior change.
Also please notice that nonEmptyDocsN can be zero, so be careful for the 
divide by 0. 



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-03 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142574222
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+ v : (BDM[Double], Option[BDV[Double]], 
Long)) => {
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN : Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
+elementWiseSum, elementWiseSum
+  )
 
+val batchResult = statsSum *:* expElogbeta.t
 // Note that this is an optimization to avoid batch.count
-updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
-if (optimizeDocConcentration) updateAlpha(gammat)
+val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
+updateLambda(batchResult, batchSize)
+
+logphatOption.foreach(_ /= batchSize.toDouble)
+logphatOption.foreach(updateAlpha(_, nonEmptyDocsN))
+
+expElogbetaBc.destroy(false)
+
 this
   }
 
   /**
-   * Update lambda based on the batch submitted. batchSize can be 
different for each iteration.
+   * Update lambda based on the batch submitted. nonEmptyDocsN can be 
different for each iteration.
--- End diff --

comments should be consistent with code. Update code or revert comment 
change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-03 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142574453
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+ v : (BDM[Double], Option[BDV[Double]], 
Long)) => {
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN : Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
+elementWiseSum, elementWiseSum
+  )
 
+val batchResult = statsSum *:* expElogbeta.t
 // Note that this is an optimization to avoid batch.count
-updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
-if (optimizeDocConcentration) updateAlpha(gammat)
+val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
+updateLambda(batchResult, batchSize)
+
+logphatOption.foreach(_ /= batchSize.toDouble)
--- End diff --

Should use nonEmptyDocsN to be consistent with original implementation, 
also avoid divide by 0.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-03 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142571342
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
--- End diff --

If it's only used once, reassign to local variable is not necessary.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-03 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142571728
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+ v : (BDM[Double], Option[BDV[Double]], 
Long)) => {
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN : Long) = stats
--- End diff --

extra space after nonEmptyDocsN 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-03 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142571603
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
--- End diff --

The comment is not that accurate. If `optimizeDocConcentration==false`, 
logphat will not be calculated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-03 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142571685
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+ v : (BDM[Double], Option[BDV[Double]], 
Long)) => {
--- End diff --

indent


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19327: [SPARK-22136][SS] Implement stream-stream outer joins.

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19327
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19327: [SPARK-22136][SS] Implement stream-stream outer joins.

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19327
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82436/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19327: [SPARK-22136][SS] Implement stream-stream outer joins.

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19327
  
**[Test build #82436 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82436/testReport)**
 for PR 19327 at commit 
[`6d06374`](https://github.com/apache/spark/commit/6d063745a1501277018b04ef264e1e72949dc107).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18732
  
**[Test build #82440 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82440/testReport)**
 for PR 18732 at commit 
[`21fed0d`](https://github.com/apache/spark/commit/21fed0dfeefe5775d193d7bb4176c38f0c2b91eb).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142572643
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -44,14 +63,22 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chi
 val schemaOut = 
StructType.fromAttributes(output.drop(child.output.length).zipWithIndex
   .map { case (attr, i) => attr.withName(s"_$i") })
 
+val batchSize = conf.arrowMaxRecordsPerBatch
+
+val batchIter = if (batchSize > 0) {
+  new BatchIterator(iter, batchSize)
+} else {
+  Iterator(iter)
+}
--- End diff --

I changed to one line. I think more concise.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142572356
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -47,7 +47,7 @@ import org.apache.spark.sql.types.StructType
  */
 @InterfaceStability.Stable
 class RelationalGroupedDataset protected[sql](
-df: DataFrame,
+val df: DataFrame,
--- End diff --

Reverted.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19404: [SPARK-21760] [Streaming] Fix for Structured streaming t...

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19404
  
**[Test build #82439 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82439/testReport)**
 for PR 19404 at commit 
[`a2d5bc7`](https://github.com/apache/spark/commit/a2d5bc706987accaeb6c9516e7d4a07b5bb3104f).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19423: Branch 2.2

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19423
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142571653
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +75,37 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+import pyarrow as pa
--- End diff --

Removed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142571660
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +75,37 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+import pyarrow as pa
+
+arrow_return_types = list(to_arrow_type(field.dataType) for field 
in return_type)
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19083
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19083
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82435/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19423: Branch 2.2

2017-10-03 Thread engineeyao
GitHub user engineeyao opened a pull request:

https://github.com/apache/spark/pull/19423

Branch 2.2

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apache/spark branch-2.2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19423.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19423


commit 9cbf39f1c74f16483865cd93d6ffc3c521e878a7
Author: Yanbo Liang 
Date:   2017-05-25T12:15:15Z

[SPARK-19281][FOLLOWUP][ML] Minor fix for PySpark FPGrowth.

## What changes were proposed in this pull request?
Follow-up for #17218, some minor fix for PySpark ```FPGrowth```.

## How was this patch tested?
Existing UT.

Author: Yanbo Liang 

Closes #18089 from yanboliang/spark-19281.

(cherry picked from commit 913a6bfe4b0eb6b80a03b858ab4b2767194103de)
Signed-off-by: Yanbo Liang 

commit e01f1f222bcb7c469b1e1595e9338ed478d99894
Author: Yan Facai (颜发才) 
Date:   2017-05-25T13:40:39Z

[SPARK-20768][PYSPARK][ML] Expose numPartitions (expert) param of PySpark 
FPGrowth.

## What changes were proposed in this pull request?

Expose numPartitions (expert) param of PySpark FPGrowth.

## How was this patch tested?

+ [x] Pass all unit tests.

Author: Yan Facai (颜发才) 

Closes #18058 from facaiy/ENH/pyspark_fpg_add_num_partition.

(cherry picked from commit 139da116f130ed21481d3e9bdee5df4b8d7760ac)
Signed-off-by: Yanbo Liang 

commit 022a4957d8dc8d6049e0a8c9191fcfd1bd95a4a4
Author: Lior Regev 
Date:   2017-05-25T16:08:19Z

[SPARK-20741][SPARK SUBMIT] Added cleanup of JARs archive generated by 
SparkSubmit

## What changes were proposed in this pull request?

Deleted generated JARs archive after distribution to HDFS

## How was this patch tested?

Please review http://spark.apache.org/contributing.html before opening a 
pull request.

Author: Lior Regev 

Closes #17986 from liorregev/master.

(cherry picked from commit 7306d556903c832984c7f34f1e8fe738a4b2343c)
Signed-off-by: Sean Owen 

commit 5ae1c652147aba9c5087335b0c6916a1035090b2
Author: hyukjinkwon 
Date:   2017-05-25T16:10:30Z

[SPARK-19707][SPARK-18922][TESTS][SQL][CORE] Fix test failures/the invalid 
path check for sc.addJar on Windows

## What changes were proposed in this pull request?

This PR proposes two things:

- A follow up for SPARK-19707 (Improving the invalid path check for 
sc.addJar on Windows as well).

```
org.apache.spark.SparkContextSuite:
 - add jar with invalid path *** FAILED *** (32 milliseconds)
   2 was not equal to 1 (SparkContextSuite.scala:309)
   ...
```

- Fix path vs URI related test failures on Windows.

```
org.apache.spark.storage.LocalDirsSuite:
 - SPARK_LOCAL_DIRS override also affects driver *** FAILED *** (0 
milliseconds)
   new java.io.File("/NONEXISTENT_PATH").exists() was true 
(LocalDirsSuite.scala:50)
   ...

 - Utils.getLocalDir() throws an exception if any temporary directory 
cannot be retrieved *** FAILED *** (15 milliseconds)
   Expected exception java.io.IOException to be thrown, but no exception 
was thrown. (LocalDirsSuite.scala:64)
   ...
```

```
org.apache.spark.sql.hive.HiveSchemaInferenceSuite:
 - orc: schema should be inferred and saved when INFER_AND_SAVE is 
specified *** FAILED *** (203 milliseconds)
   java.net.URISyntaxException: Illegal character in opaque part at index 
2: C:\projects\spark\target\tmp\spark-dae61ab3-a851-4dd3-bf4e-be97c501f254
   ...

 - parquet: schema should be inferred and saved when INFER_AND_SAVE is 
specified *** FAILED *** (203 milliseconds)
   java.net.URISyntaxException: Illegal character in opaque part at index 
2: C:\projects\spark\target\tmp\spark-fa3aff89-a66e-4376-9a37-2a9b87596939
   ...

 - orc: schema should be inferred but not stored when INFER_ONLY is 
specified *** FAILED *** (141 milliseconds)
   java.net.URISyntaxException: Illegal character in opaque part at index 
2: C:\projects\spark\target\tmp\spark-fb464e59-b049-481b-9c75-f53295c9fc2c
   ...

 - parquet: schema should be inferred but not stored when INFER_ONLY is 
specified *** FAILED *** (125 millis

[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19083
  
**[Test build #82435 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82435/testReport)**
 for PR 19083 at commit 
[`dfde49b`](https://github.com/apache/spark/commit/dfde49bcc487ecbc0135cd301e8d9c3ad17921be).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...

2017-10-03 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19061#discussion_r142571432
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -598,6 +598,15 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+// In case of shells, spark.ui.showConsoleProgress can be true by 
default or by user.
+if (isShell(args.primaryResource)) {
+  if (!sparkConf.contains("spark.ui.showConsoleProgress")) {
--- End diff --

Thanks! Then, I'll create a `deploy/config.scala` and put it there. The 
followings are the existing `config.scala` files, but it seems to be irrelevant 
to `deploy/SparkSubmit.scala`.
```
$ find . -name config.scala
./core/src/main/scala/org/apache/spark/deploy/history/config.scala

./resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala

./resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142571075
  
--- Diff: python/pyspark/worker.py ---
@@ -32,8 +32,9 @@
 from pyspark.serializers import write_with_length, write_int, read_long, \
 write_long, read_int, SpecialLengths, PythonEvalType, 
UTF8Deserializer, PickleSerializer, \
 BatchedSerializer, ArrowStreamPandasSerializer
-from pyspark.sql.types import toArrowType
+from pyspark.sql.types import to_arrow_type
 from pyspark import shuffle
+from pyspark.sql.types import StructType
--- End diff --

Good catch. Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142571047
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3376,6 +3377,132 @@ def test_vectorized_udf_empty_partition(self):
 res = df.select(f(col('id')))
 self.assertEquals(df.collect(), res.collect())
 
+def test_vectorized_udf_varargs(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 
2))
+f = pandas_udf(lambda *v: v[0], LongType())
+res = df.select(f(col('id')))
+self.assertEquals(df.collect(), res.collect())
+
+
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
+class GroupbyApplyTests(ReusedPySparkTestCase):
+@classmethod
+def setUpClass(cls):
+ReusedPySparkTestCase.setUpClass()
+cls.spark = SparkSession(cls.sc)
+
+@classmethod
+def tearDownClass(cls):
+ReusedPySparkTestCase.tearDownClass()
+cls.spark.stop()
+
+def assertFramesEqual(self, expected, result):
+msg = ("DataFrames are not equal: " +
+   ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) +
+   ("\n\nResult:\n%s\n%s" % (result, result.dtypes)))
+self.assertTrue(expected.equals(result), msg=msg)
+
+@property
+def data(self):
+from pyspark.sql.functions import pandas_udf, array, explode, col, 
lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
+.withColumn("v", explode(col('vs'))).drop('vs')
+
+def test_simple(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+def foo(df):
+ret = df
+ret = ret.assign(v1=df.v * df.id * 1.0)
+ret = ret.assign(v2=df.v + df.id)
+return ret
+
+foo_udf = pandas_udf(
+foo,
+StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+
+result = df.groupby('id').apply(foo_udf).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_decorator(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+def foo(df):
+ret = df
+ret = ret.assign(v1=df.v * df.id * 1.0)
+ret = ret.assign(v2=df.v + df.id)
+return ret
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142571038
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3376,6 +3377,132 @@ def test_vectorized_udf_empty_partition(self):
 res = df.select(f(col('id')))
 self.assertEquals(df.collect(), res.collect())
 
+def test_vectorized_udf_varargs(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 
2))
+f = pandas_udf(lambda *v: v[0], LongType())
+res = df.select(f(col('id')))
+self.assertEquals(df.collect(), res.collect())
+
+
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
+class GroupbyApplyTests(ReusedPySparkTestCase):
+@classmethod
+def setUpClass(cls):
+ReusedPySparkTestCase.setUpClass()
+cls.spark = SparkSession(cls.sc)
+
+@classmethod
+def tearDownClass(cls):
+ReusedPySparkTestCase.tearDownClass()
+cls.spark.stop()
+
+def assertFramesEqual(self, expected, result):
+msg = ("DataFrames are not equal: " +
+   ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) +
+   ("\n\nResult:\n%s\n%s" % (result, result.dtypes)))
+self.assertTrue(expected.equals(result), msg=msg)
+
+@property
+def data(self):
+from pyspark.sql.functions import pandas_udf, array, explode, col, 
lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
+.withColumn("v", explode(col('vs'))).drop('vs')
+
+def test_simple(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+def foo(df):
+ret = df
+ret = ret.assign(v1=df.v * df.id * 1.0)
+ret = ret.assign(v2=df.v + df.id)
+return ret
+
+foo_udf = pandas_udf(
+foo,
+StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+
+result = df.groupby('id').apply(foo_udf).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_decorator(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+def foo(df):
+ret = df
+ret = ret.assign(v1=df.v * df.id * 1.0)
+ret = ret.assign(v2=df.v + df.id)
+return ret
+
+result = df.groupby('id').apply(foo).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_coerce(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+def foo(df):
+ret = df
+ret = ret.assign(v=df.v + 1)
+return ret
+
+@pandas_udf(StructType([StructField('id', LongType()), 
StructField('v', DoubleType())]))
+def foo(df):
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142571056
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3376,6 +3377,133 @@ def test_vectorized_udf_empty_partition(self):
 res = df.select(f(col('id')))
 self.assertEquals(df.collect(), res.collect())
 
+def test_vectorized_udf_varargs(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 
2))
+f = pandas_udf(lambda *v: v[0], LongType())
+res = df.select(f(col('id')))
+self.assertEquals(df.collect(), res.collect())
+
+
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
+class GroupbyApplyTests(ReusedPySparkTestCase):
+@classmethod
+def setUpClass(cls):
+ReusedPySparkTestCase.setUpClass()
+cls.spark = SparkSession(cls.sc)
+
+@classmethod
+def tearDownClass(cls):
+ReusedPySparkTestCase.tearDownClass()
+cls.spark.stop()
+
+def assertFramesEqual(self, expected, result):
+msg = ("DataFrames are not equal: " +
+   ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) +
+   ("\n\nResult:\n%s\n%s" % (result, result.dtypes)))
+self.assertTrue(expected.equals(result), msg=msg)
+
+@property
+def data(self):
+from pyspark.sql.functions import pandas_udf, array, explode, col, 
lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
+.withColumn("v", explode(col('vs'))).drop('vs')
+
+def test_simple(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+def foo(df):
+ret = df
+ret = ret.assign(v1=df.v * df.id * 1.0)
+ret = ret.assign(v2=df.v + df.id)
+return ret
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...

2017-10-03 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19061#discussion_r142570731
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -598,6 +598,15 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+// In case of shells, spark.ui.showConsoleProgress can be true by 
default or by user.
+if (isShell(args.primaryResource)) {
+  if (!sparkConf.contains("spark.ui.showConsoleProgress")) {
+sysProps("spark.ui.showConsoleProgress") = "true"
+  }
+} else {
+  sysProps("spark.ui.showConsoleProgress") = "false"
--- End diff --

Yep. I see. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...

2017-10-03 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19061#discussion_r142570436
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -598,6 +598,15 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+// In case of shells, spark.ui.showConsoleProgress can be true by 
default or by user.
+if (isShell(args.primaryResource)) {
+  if (!sparkConf.contains("spark.ui.showConsoleProgress")) {
+sysProps("spark.ui.showConsoleProgress") = "true"
+  }
+} else {
+  sysProps("spark.ui.showConsoleProgress") = "false"
--- End diff --

I don't see a need to prevent users from forcing this configuration if they 
want to.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...

2017-10-03 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19061#discussion_r142570378
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -598,6 +598,15 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+// In case of shells, spark.ui.showConsoleProgress can be true by 
default or by user.
+if (isShell(args.primaryResource)) {
+  if (!sparkConf.contains("spark.ui.showConsoleProgress")) {
--- End diff --

It should be added to the config package object like all other constants in 
core.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18732
  
**[Test build #82438 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82438/testReport)**
 for PR 18732 at commit 
[`4943ceb`](https://github.com/apache/spark/commit/4943ceb8b57041a7bf911e8f8637380c5fc263ff).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...

2017-10-03 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19061#discussion_r142570193
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -598,6 +598,15 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+// In case of shells, spark.ui.showConsoleProgress can be true by 
default or by user.
+if (isShell(args.primaryResource)) {
+  if (!sparkConf.contains("spark.ui.showConsoleProgress")) {
+sysProps("spark.ui.showConsoleProgress") = "true"
+  }
+} else {
+  sysProps("spark.ui.showConsoleProgress") = "false"
--- End diff --

Originally, I prefer blocking it from here. Please comment me back if this 
should be removed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...

2017-10-03 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19061#discussion_r142570065
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -598,6 +598,15 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+// In case of shells, spark.ui.showConsoleProgress can be true by 
default or by user.
+if (isShell(args.primaryResource)) {
+  if (!sparkConf.contains("spark.ui.showConsoleProgress")) {
+sysProps("spark.ui.showConsoleProgress") = "true"
+  }
+} else {
+  sysProps("spark.ui.showConsoleProgress") = "false"
--- End diff --

Or, yes. It can be removed because `SparkContext` also accepts user's 
configuration. In that case, you're right. This is not needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...

2017-10-03 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19061#discussion_r142569400
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -598,6 +598,15 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+// In case of shells, spark.ui.showConsoleProgress can be true by 
default or by user.
+if (isShell(args.primaryResource)) {
+  if (!sparkConf.contains("spark.ui.showConsoleProgress")) {
--- End diff --

Ur, @vanzin . It seems there is no pre-defined config for `spark.ui.XXX`.
Is it a convention for Apache Spark? Which class do you prefer to add a new 
config?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...

2017-10-03 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19061#discussion_r142569029
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -598,6 +598,15 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+// In case of shells, spark.ui.showConsoleProgress can be true by 
default or by user.
+if (isShell(args.primaryResource)) {
+  if (!sparkConf.contains("spark.ui.showConsoleProgress")) {
+sysProps("spark.ui.showConsoleProgress") = "true"
+  }
+} else {
+  sysProps("spark.ui.showConsoleProgress") = "false"
--- End diff --

This'll prevent user's mistakes by overriding user's 
`spark.ui.showConsoleProgress=true` with `SparkSubmit` CLI. In case of 
`SparkSubmit`, only shells will accept user's `spark.ui.showConsoleProgress`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...

2017-10-03 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19061#discussion_r142568710
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -598,6 +598,15 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+// In case of shells, spark.ui.showConsoleProgress can be true by 
default or by user.
+if (isShell(args.primaryResource)) {
+  if (!sparkConf.contains("spark.ui.showConsoleProgress")) {
--- End diff --

Thank you, @vanzin . I'll create a new config for this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19083
  
**[Test build #82437 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82437/testReport)**
 for PR 19083 at commit 
[`fca22b7`](https://github.com/apache/spark/commit/fca22b767fddb061303cddd4e06c87130b1b32dc).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-03 Thread maropu
Github user maropu commented on the issue:

https://github.com/apache/spark/pull/19083
  
Thanks, I'll update soon


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19416: [SPARK-22187][SS] Update unsaferow format for sav...

2017-10-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19416#discussion_r142562531
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/FlatMapGroupsWithState_StateManager.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, BoundReference, CaseWhen, CreateNamedStruct, 
GetStructField, IsNull, Literal, UnsafeRow}
+import org.apache.spark.sql.execution.ObjectOperator
+import org.apache.spark.sql.execution.streaming.GroupStateImpl
+import org.apache.spark.sql.execution.streaming.GroupStateImpl.NO_TIMESTAMP
+import org.apache.spark.sql.types.{IntegerType, LongType, StructType}
+
+
+class FlatMapGroupsWithState_StateManager(
--- End diff --

Yes it is.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142561980
  
--- Diff: python/pyspark/sql/group.py ---
@@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col, values)
 return GroupedData(jgd, self.sql_ctx)
 
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-function should take a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
+Each group is passed as a `pandas.DataFrame` to the user-function 
and the returned
+`pandas.DataFrame` are combined as a :class:`DataFrame`. The 
returned `pandas.DataFrame`
+can be arbitrary length and its schema should match the returnType 
of the pandas udf.
--- End diff --

nvm, I think it wrongly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142561534
  
--- Diff: python/pyspark/sql/group.py ---
@@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col, values)
 return GroupedData(jgd, self.sql_ctx)
 
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-function should take a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
+Each group is passed as a `pandas.DataFrame` to the user-function 
and the returned
+`pandas.DataFrame` are combined as a :class:`DataFrame`. The 
returned `pandas.DataFrame`
+can be arbitrary length and its schema should match the returnType 
of the pandas udf.
--- End diff --

Can the returned `pandas.DataFrame` be arbitrary length? Since we apply it 
on a `GroupedData`, I think this `apply` should work like aggregation as 
`count`, `avg` and returned `pandas.DataFrame` should be 1 length?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

2017-10-03 Thread WeichenXu123
Github user WeichenXu123 commented on the issue:

https://github.com/apache/spark/pull/18924
  
Oh, sorry for that, it should waiting @jkbradley to merge it. Don't worry, 
I will contact him!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18801: SPARK-10878 Fix race condition when multiple clients res...

2017-10-03 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/18801
  
gentle ping @Victsm 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19327: [SPARK-22136][SS] Implement stream-stream outer joins.

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19327
  
**[Test build #82436 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82436/testReport)**
 for PR 19327 at commit 
[`6d06374`](https://github.com/apache/spark/commit/6d063745a1501277018b04ef264e1e72949dc107).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-03 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19083
  
Few minor comments otherwise LGTM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19404: [SPARK-21760] [Streaming] Fix for Structured streaming t...

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19404
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19404: [SPARK-21760] [Streaming] Fix for Structured streaming t...

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19404
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82434/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19404: [SPARK-21760] [Streaming] Fix for Structured streaming t...

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19404
  
**[Test build #82434 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82434/testReport)**
 for PR 19404 at commit 
[`f945f39`](https://github.com/apache/spark/commit/f945f39438c6a1cabff3f18489dd2082c4cba24c).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   >