[GitHub] spark pull request #19243: [SPARK-21780][R] Simpler Dataset.sample API in R

2017-09-18 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/19243#discussion_r139602239
  
--- Diff: R/pkg/R/DataFrame.R ---
@@ -998,33 +998,39 @@ setMethod("unique",
 #' sparkR.session()
 #' path <- "path/to/file.json"
 #' df <- read.json(path)
+#' collect(sample(df, fraction=0.5))
 #' collect(sample(df, FALSE, 0.5))
-#' collect(sample(df, TRUE, 0.5))
+#' collect(sample(df, TRUE, 0.5, seed=3))
 #'}
 #' @note sample since 1.4.0
 setMethod("sample",
-  signature(x = "SparkDataFrame", withReplacement = "logical",
-fraction = "numeric"),
-  function(x, withReplacement, fraction, seed) {
-if (fraction < 0.0) stop(cat("Negative fraction value:", 
fraction))
+  signature(x = "SparkDataFrame"),
+  function(x, withReplacement = FALSE, fraction, seed) {
+if (!is.numeric(fraction)) {
+  stop(paste("fraction must be numeric; however, got", 
class(fraction)))
+}
+if (!is.logical(withReplacement)) {
+  stop(paste("withReplacement must be logical; however, got", 
class(withReplacement)))
+}
 if (!missing(seed)) {
   # TODO : Figure out how to send integer as java.lang.Long to 
JVM so
   # we can send seed as an argument through callJMethod
-  sdf <- callJMethod(x@sdf, "sample", withReplacement, 
fraction, as.integer(seed))
+  sdf <- handledCallJMethod(x@sdf, "sample", withReplacement,
--- End diff --

I'd then wrap `withReplacement` as `as.logical(withReplacement)` and 
`fraction` as `as.numeric(fraction)`
because it might be coercible (note the L)
```
> is.numeric(1L)
[1] TRUE
```
but passing as integer could cause callJMethod to match to a different 
signature on the JVM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19243: [SPARK-21780][R] Simpler Dataset.sample API in R

2017-09-18 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/19243#discussion_r139602407
  
--- Diff: R/pkg/R/DataFrame.R ---
@@ -998,33 +998,39 @@ setMethod("unique",
 #' sparkR.session()
 #' path <- "path/to/file.json"
 #' df <- read.json(path)
+#' collect(sample(df, fraction=0.5))
 #' collect(sample(df, FALSE, 0.5))
-#' collect(sample(df, TRUE, 0.5))
+#' collect(sample(df, TRUE, 0.5, seed=3))
 #'}
 #' @note sample since 1.4.0
 setMethod("sample",
-  signature(x = "SparkDataFrame", withReplacement = "logical",
-fraction = "numeric"),
-  function(x, withReplacement, fraction, seed) {
-if (fraction < 0.0) stop(cat("Negative fraction value:", 
fraction))
+  signature(x = "SparkDataFrame"),
+  function(x, withReplacement = FALSE, fraction, seed) {
+if (!is.numeric(fraction)) {
+  stop(paste("fraction must be numeric; however, got", 
class(fraction)))
+}
+if (!is.logical(withReplacement)) {
+  stop(paste("withReplacement must be logical; however, got", 
class(withReplacement)))
+}
 if (!missing(seed)) {
   # TODO : Figure out how to send integer as java.lang.Long to 
JVM so
   # we can send seed as an argument through callJMethod
-  sdf <- callJMethod(x@sdf, "sample", withReplacement, 
fraction, as.integer(seed))
+  sdf <- handledCallJMethod(x@sdf, "sample", withReplacement,
+fraction, as.integer(seed))
--- End diff --

we should be careful only `as.integer` if it isn't NULL or NA
```
> as.integer(NULL)
integer(0)
> as.integer(NA)
[1] NA
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19243: [SPARK-21780][R] Simpler Dataset.sample API in R

2017-09-18 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/19243#discussion_r139601709
  
--- Diff: R/pkg/R/DataFrame.R ---
@@ -984,12 +984,12 @@ setMethod("unique",
 #' of the total count of of the given SparkDataFrame.
 #'
 #' @param x A SparkDataFrame
-#' @param withReplacement Sampling with replacement or not
+#' @param withReplacement Sampling with replacement or not. Default is 
\code{FALSE}.
--- End diff --

we actually want to change to not documenting the default value if it is 
already in the signature - because then it would be the roxygen2 generated doc 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19243: [SPARK-21780][R] Simpler Dataset.sample API in R

2017-09-18 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/19243#discussion_r139601790
  
--- Diff: R/pkg/R/DataFrame.R ---
@@ -998,33 +998,39 @@ setMethod("unique",
 #' sparkR.session()
 #' path <- "path/to/file.json"
 #' df <- read.json(path)
+#' collect(sample(df, fraction=0.5))
 #' collect(sample(df, FALSE, 0.5))
-#' collect(sample(df, TRUE, 0.5))
+#' collect(sample(df, TRUE, 0.5, seed=3))
--- End diff --

I think the style here is to have one space between the param name and its 
value, like `seed = 3` and `fraction = 0.5` above


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19130: [SPARK-21917][CORE][YARN] Supporting adding http(s) reso...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19130
  
**[Test build #81912 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81912/testReport)**
 for PR 19130 at commit 
[`9a2c8c7`](https://github.com/apache/spark/commit/9a2c8c79451ab654e2a8fae8b67e4a31d9c8b97f).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139600676
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervalsSuite.scala
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, CreateArray, Literal, SpecificInternalRow}
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.types._
+
+class ApproxCountDistinctForIntervalsSuite extends SparkFunSuite {
+
+  test("fails analysis if parameters are invalid") {
+def assertEqual[T](left: T, right: T): Unit = {
+  assert(left == right)
+}
+
+val wrongColumnTypes = Seq(BinaryType, BooleanType, StringType, 
ArrayType(IntegerType),
+  MapType(IntegerType, IntegerType), StructType(Seq(StructField("s", 
IntegerType
+wrongColumnTypes.foreach { dataType =>
+  val wrongColumn = new ApproxCountDistinctForIntervals(
+AttributeReference("a", dataType)(),
+endpointsExpression = CreateArray(Seq(1, 10).map(Literal(_
+  assert(
+wrongColumn.checkInputDataTypes() match {
+  case TypeCheckFailure(msg)
+if msg.contains("requires (numeric or timestamp or date) 
type") => true
+  case _ => false
+})
+}
+
+var wrongEndpoints = new ApproxCountDistinctForIntervals(
+  AttributeReference("a", DoubleType)(),
+  endpointsExpression = Literal(0.5d))
+assert(
+  wrongEndpoints.checkInputDataTypes() match {
+case TypeCheckFailure(msg) if msg.contains("requires array type") 
=> true
+case _ => false
+  })
+
+wrongEndpoints = new ApproxCountDistinctForIntervals(
+  AttributeReference("a", DoubleType)(),
+  endpointsExpression = CreateArray(Seq(AttributeReference("b", 
DoubleType)(
+assertEqual(
+  wrongEndpoints.checkInputDataTypes(),
+  TypeCheckFailure("The intervals provided must be constant literals"))
+
+wrongEndpoints = new ApproxCountDistinctForIntervals(
+  AttributeReference("a", DoubleType)(),
+  endpointsExpression = CreateArray(Array(10L).map(Literal(_
+assertEqual(
+  wrongEndpoints.checkInputDataTypes(),
+  TypeCheckFailure("The number of endpoints must be >= 2 to construct 
intervals"))
+  }
+
+  /** Create an ApproxCountDistinctForIntervals instance and an input and 
output buffer. */
+  private def createEstimator(
+  endpoints: Array[Double],
+  rsd: Double = 0.05,
+  dt: DataType = IntegerType): (ApproxCountDistinctForIntervals, 
InternalRow, InternalRow) = {
+val input = new SpecificInternalRow(Seq(dt))
+val aggFunc = ApproxCountDistinctForIntervals(
+  BoundReference(0, dt, nullable = true), 
CreateArray(endpoints.map(Literal(_))), rsd)
+val buffer = createBuffer(aggFunc)
+(aggFunc, input, buffer)
+  }
+
+  private def createBuffer(aggFunc: ApproxCountDistinctForIntervals): 
InternalRow = {
+val buffer = new 
SpecificInternalRow(aggFunc.aggBufferAttributes.map(_.dataType))
+aggFunc.initialize(buffer)
+buffer
+  }
+
+  test("merging ApproxCountDistinctForIntervals instances") {
+val (aggFunc, input, buffer1a) = createEstimator(Array[Double](0, 10, 
2000, 345678, 100))
+val buffer1b = createBuffer(aggFunc)
+val buffer2 = createBuffer(aggFunc)
+
+// Create the
+// Add the lower half
+var i = 0
+while (i < 50) {
+  input.setInt(0, i)
+  

[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139600490
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervalsSuite.scala
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, CreateArray, Literal, SpecificInternalRow}
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.types._
+
+class ApproxCountDistinctForIntervalsSuite extends SparkFunSuite {
+
+  test("fails analysis if parameters are invalid") {
+def assertEqual[T](left: T, right: T): Unit = {
+  assert(left == right)
+}
+
+val wrongColumnTypes = Seq(BinaryType, BooleanType, StringType, 
ArrayType(IntegerType),
+  MapType(IntegerType, IntegerType), StructType(Seq(StructField("s", 
IntegerType
+wrongColumnTypes.foreach { dataType =>
+  val wrongColumn = new ApproxCountDistinctForIntervals(
+AttributeReference("a", dataType)(),
+endpointsExpression = CreateArray(Seq(1, 10).map(Literal(_
+  assert(
+wrongColumn.checkInputDataTypes() match {
+  case TypeCheckFailure(msg)
+if msg.contains("requires (numeric or timestamp or date) 
type") => true
+  case _ => false
+})
+}
+
+var wrongEndpoints = new ApproxCountDistinctForIntervals(
+  AttributeReference("a", DoubleType)(),
+  endpointsExpression = Literal(0.5d))
+assert(
+  wrongEndpoints.checkInputDataTypes() match {
+case TypeCheckFailure(msg) if msg.contains("requires array type") 
=> true
+case _ => false
+  })
+
+wrongEndpoints = new ApproxCountDistinctForIntervals(
+  AttributeReference("a", DoubleType)(),
+  endpointsExpression = CreateArray(Seq(AttributeReference("b", 
DoubleType)(
+assertEqual(
+  wrongEndpoints.checkInputDataTypes(),
+  TypeCheckFailure("The intervals provided must be constant literals"))
+
+wrongEndpoints = new ApproxCountDistinctForIntervals(
+  AttributeReference("a", DoubleType)(),
+  endpointsExpression = CreateArray(Array(10L).map(Literal(_
+assertEqual(
+  wrongEndpoints.checkInputDataTypes(),
+  TypeCheckFailure("The number of endpoints must be >= 2 to construct 
intervals"))
+  }
+
+  /** Create an ApproxCountDistinctForIntervals instance and an input and 
output buffer. */
+  private def createEstimator(
+  endpoints: Array[Double],
+  rsd: Double = 0.05,
+  dt: DataType = IntegerType): (ApproxCountDistinctForIntervals, 
InternalRow, InternalRow) = {
+val input = new SpecificInternalRow(Seq(dt))
+val aggFunc = ApproxCountDistinctForIntervals(
+  BoundReference(0, dt, nullable = true), 
CreateArray(endpoints.map(Literal(_))), rsd)
+val buffer = createBuffer(aggFunc)
+(aggFunc, input, buffer)
+  }
+
+  private def createBuffer(aggFunc: ApproxCountDistinctForIntervals): 
InternalRow = {
+val buffer = new 
SpecificInternalRow(aggFunc.aggBufferAttributes.map(_.dataType))
+aggFunc.initialize(buffer)
+buffer
+  }
+
+  test("merging ApproxCountDistinctForIntervals instances") {
+val (aggFunc, input, buffer1a) = createEstimator(Array[Double](0, 10, 
2000, 345678, 100))
+val buffer1b = createBuffer(aggFunc)
+val buffer2 = createBuffer(aggFunc)
+
+// Create the
--- End diff --

typo?


---

-
To 

[GitHub] spark issue #18805: [SPARK-19112][CORE] Support for ZStandard codec

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18805
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81911/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18805: [SPARK-19112][CORE] Support for ZStandard codec

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18805
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18805: [SPARK-19112][CORE] Support for ZStandard codec

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18805
  
**[Test build #81911 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81911/testReport)**
 for PR 18805 at commit 
[`38d4840`](https://github.com/apache/spark/commit/38d484095cb404e3aa2e05633089e087d3a9d2dc).
 * This patch **fails build dependency tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18805: [SPARK-19112][CORE] Support for ZStandard codec

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18805
  
**[Test build #81911 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81911/testReport)**
 for PR 18805 at commit 
[`38d4840`](https://github.com/apache/spark/commit/38d484095cb404e3aa2e05633089e087d3a9d2dc).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18805: [SPARK-19112][CORE] Support for ZStandard codec

2017-09-18 Thread sitalkedia
Github user sitalkedia commented on the issue:

https://github.com/apache/spark/pull/18805
  
Updated with zstd-jni versin 1.3.1-1 and also updated the license to 
include zstd-jni license. @srowen - How does that look from licensing 
prospective?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...

2017-09-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15544#discussion_r139599361
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala
 ---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
ExpectsInputTypes, Expression}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
HyperLogLogPlusPlusHelper}
+import org.apache.spark.sql.types._
+
+/**
+ * This function counts the approximate number of distinct values (ndv) in
+ * intervals constructed from endpoints specified in 
`endpointsExpression`. The endpoints should be
+ * sorted into ascending order. E.g., given an array of endpoints
+ * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's 
for intervals
+ * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, 
endpoint_N].
+ * To count ndv's in these intervals, apply the HyperLogLogPlusPlus 
algorithm in each of them.
+ * @param child to estimate the ndv's of.
+ * @param endpointsExpression to construct the intervals, should be sorted 
into ascending order.
+ * @param relativeSD The maximum estimation error allowed in the 
HyperLogLogPlusPlus algorithm.
+ */
+case class ApproxCountDistinctForIntervals(
+child: Expression,
+endpointsExpression: Expression,
+relativeSD: Double = 0.05,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0)
+  extends ImperativeAggregate with ExpectsInputTypes {
+
+  def this(child: Expression, endpointsExpression: Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = 0.05,
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  def this(child: Expression, endpointsExpression: Expression, relativeSD: 
Expression) = {
+this(
+  child = child,
+  endpointsExpression = endpointsExpression,
+  relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD),
+  mutableAggBufferOffset = 0,
+  inputAggBufferOffset = 0)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType)
+  }
+
+  // Mark as lazy so that endpointsExpression is not evaluated during tree 
transformation.
+  lazy val endpoints: Array[Double] =
+(endpointsExpression.dataType, endpointsExpression.eval()) match {
+  case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) =>
+val numericArray = arrayData.toObjectArray(baseType)
+numericArray.map { x =>
+  baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType])
+}
+}
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!endpointsExpression.foldable) {
+  TypeCheckFailure("The intervals provided must be constant literals")
+} else if (endpoints.length < 2) {
+  TypeCheckFailure("The number of endpoints must be >= 2 to construct 
intervals")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  // N endpoints construct N-1 intervals, creating a HLLPP for each 
interval
+  private lazy val hllppArray = {
+val array = new Array[HyperLogLogPlusPlusHelper](endpoints.length - 1)
+for (i <- array.indices) {
+  array(i) = new 

[GitHub] spark issue #19271: [SPARK-22053][SS] Stream-stream inner join in Append Mod...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19271
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81907/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19271: [SPARK-22053][SS] Stream-stream inner join in Append Mod...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19271
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19271: [SPARK-22053][SS] Stream-stream inner join in Append Mod...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19271
  
**[Test build #81907 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81907/testReport)**
 for PR 19271 at commit 
[`94b63fb`](https://github.com/apache/spark/commit/94b63fb4082c98ff4f2e67e0759e98f46a3537a5).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  sealed trait JoinStateWatermarkPredicate `


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19068: [SPARK-21428][SQL][FOLLOWUP]CliSessionState should point...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19068
  
**[Test build #81910 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81910/testReport)**
 for PR 19068 at commit 
[`c5c1c26`](https://github.com/apache/spark/commit/c5c1c2625d33dd08cbdde2e25041359bbbf50339).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19068: [SPARK-21428][SQL][FOLLOWUP]CliSessionState should point...

2017-09-18 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19068
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19068: [SPARK-21428][SQL][FOLLOWUP]CliSessionState should point...

2017-09-18 Thread yaooqinn
Github user yaooqinn commented on the issue:

https://github.com/apache/spark/pull/19068
  
@cloud-fan i met linkage err before, and now i simplify the logic, could 
you trigger jenkins before reverting


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19068: [SPARK-21428][SQL][FOLLOWUP]CliSessionState should point...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19068
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81909/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19068: [SPARK-21428][SQL][FOLLOWUP]CliSessionState should point...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19068
  
**[Test build #81909 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81909/testReport)**
 for PR 19068 at commit 
[`f2618b9`](https://github.com/apache/spark/commit/f2618b9bd8a8c5c6dcd50257f089e2412b30b252).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19068: [SPARK-21428][SQL][FOLLOWUP]CliSessionState should point...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19068
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19068: [SPARK-21428][SQL][FOLLOWUP]CliSessionState should point...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19068
  
**[Test build #81909 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81909/testReport)**
 for PR 19068 at commit 
[`f2618b9`](https://github.com/apache/spark/commit/f2618b9bd8a8c5c6dcd50257f089e2412b30b252).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs

2017-09-18 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/18659
  
what if users installed an older version of pyarrow? Shall we throw 
exception and ask them to upgrade, or work around type casting issue?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19130: [SPARK-21917][CORE][YARN] Supporting adding http(s) reso...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19130
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19130: [SPARK-21917][CORE][YARN] Supporting adding http(s) reso...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19130
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81905/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19130: [SPARK-21917][CORE][YARN] Supporting adding http(s) reso...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19130
  
**[Test build #81905 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81905/testReport)**
 for PR 19130 at commit 
[`580d587`](https://github.com/apache/spark/commit/580d5872bfa46c7c7b2d37ce6c4cf4568dc32050).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19273: Revert "[SPARK-21428] Turn IsolatedClientLoader off whil...

2017-09-18 Thread yaooqinn
Github user yaooqinn commented on the issue:

https://github.com/apache/spark/pull/19273
  
ok to me, more tests are needed on #18648 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19211: [SPARK-18838][core] Add separate listener queues to Live...

2017-09-18 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19211
  
LGTM except some minor comments


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19211: [SPARK-18838][core] Add separate listener queues ...

2017-09-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19211#discussion_r139594788
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -39,20 +41,13 @@ import org.apache.spark.util.Utils
  * has started will events be actually propagated to all attached 
listeners. This listener bus
  * is stopped when `stop()` is called, and it will drop further events 
after stopping.
  */
-private[spark] class LiveListenerBus(conf: SparkConf) extends 
SparkListenerBus {
-
-  self =>
+private[spark] class LiveListenerBus(conf: SparkConf) {
--- End diff --

also `LiveListenerBus.addListener` seems not valid too. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19211: [SPARK-18838][core] Add separate listener queues ...

2017-09-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19211#discussion_r139594666
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -39,20 +41,13 @@ import org.apache.spark.util.Utils
  * has started will events be actually propagated to all attached 
listeners. This listener bus
  * is stopped when `stop()` is called, and it will drop further events 
after stopping.
  */
-private[spark] class LiveListenerBus(conf: SparkConf) extends 
SparkListenerBus {
-
-  self =>
+private[spark] class LiveListenerBus(conf: SparkConf) {
--- End diff --

is it still valid to call `LiveListenerBus.postToAll`? If not, shall we 
overwrite `postToAll` and throw exception?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19211: [SPARK-18838][core] Add separate listener queues ...

2017-09-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19211#discussion_r139594602
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala 
---
@@ -575,8 +575,6 @@ class StreamingContextSuite extends SparkFunSuite with 
BeforeAndAfter with TimeL
 
   test("getActive and getActiveOrCreate") {
 require(StreamingContext.getActive().isEmpty, "context exists from 
before")
-sc = new SparkContext(conf)
--- End diff --

why this change?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19211: [SPARK-18838][core] Add separate listener queues ...

2017-09-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19211#discussion_r139594359
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---
@@ -42,59 +44,65 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
   private val mockSparkContext: SparkContext = 
Mockito.mock(classOf[SparkContext])
   private val mockMetricsSystem: MetricsSystem = 
Mockito.mock(classOf[MetricsSystem])
 
+  private def numDroppedEvents(bus: LiveListenerBus): Long = {
+
bus.metrics.metricRegistry.counter(s"queue.$SHARED_QUEUE.numDroppedEvents").getCount
+  }
+
+  private def queueSize(bus: LiveListenerBus): Int = {
+
bus.metrics.metricRegistry.getGauges().get(s"queue.$SHARED_QUEUE.size").getValue()
+  .asInstanceOf[Int]
+  }
+
+  private def eventProcessingTimeCount(bus: LiveListenerBus): Long = {
+
bus.metrics.metricRegistry.timer(s"queue.$SHARED_QUEUE.listenerProcessingTime").getCount()
+  }
+
   test("don't call sc.stop in listener") {
 sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
 val listener = new SparkContextStoppingListener(sc)
-val bus = new LiveListenerBus(sc.conf)
-bus.addListener(listener)
 
-// Starting listener bus should flush all buffered events
-bus.start(sc, sc.env.metricsSystem)
-bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
-bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+sc.listenerBus.addToSharedQueue(listener)
+sc.listenerBus.post(SparkListenerJobEnd(0, jobCompletionTime, 
JobSucceeded))
+sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+sc.stop()
 
-bus.stop()
 assert(listener.sparkExSeen)
   }
 
   test("basic creation and shutdown of LiveListenerBus") {
 val conf = new SparkConf()
 val counter = new BasicJobCounter
 val bus = new LiveListenerBus(conf)
-bus.addListener(counter)
+bus.addToSharedQueue(counter)
 
 // Metrics are initially empty.
 assert(bus.metrics.numEventsPosted.getCount === 0)
-assert(bus.metrics.numDroppedEvents.getCount === 0)
-assert(bus.metrics.queueSize.getValue === 0)
-assert(bus.metrics.eventProcessingTime.getCount === 0)
+assert(numDroppedEvents(bus) === 0)
+assert(queueSize(bus) === 0)
+assert(eventProcessingTimeCount(bus) === 0)
 
 // Post five events:
 (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, 
jobCompletionTime, JobSucceeded)) }
 
 // Five messages should be marked as received and queued, but no 
messages should be posted to
 // listeners yet because the the listener bus hasn't been started.
 assert(bus.metrics.numEventsPosted.getCount === 5)
-assert(bus.metrics.queueSize.getValue === 5)
+assert(queueSize(bus) === 5)
 assert(counter.count === 0)
 
 // Starting listener bus should flush all buffered events
 bus.start(mockSparkContext, mockMetricsSystem)
 Mockito.verify(mockMetricsSystem).registerSource(bus.metrics)
 bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
 assert(counter.count === 5)
-assert(bus.metrics.queueSize.getValue === 0)
-assert(bus.metrics.eventProcessingTime.getCount === 5)
+assert(queueSize(bus) === 0)
+assert(eventProcessingTimeCount(bus) === 5)
 
 // After listener bus has stopped, posting events should not increment 
counter
 bus.stop()
 (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, 
jobCompletionTime, JobSucceeded)) }
 assert(counter.count === 5)
-assert(bus.metrics.numEventsPosted.getCount === 5)
-
-// Make sure per-listener-class timers were created:
--- End diff --

oh seems we don't need it anymore.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-18 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r139594335
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 ---
@@ -503,6 +511,39 @@ public void testGetIterator() throws Exception {
 verifyIntIterator(sorter.getIterator(279), 279, 300);
   }
 
+  @Test
+  public void testOOMDuringSpill() throws Exception {
+final UnsafeExternalSorter sorter = newSorter();
+for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) {
--- End diff --

according to 
[Jenkins](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81782/testReport/org.apache.spark.util.collection.unsafe.sort/UnsafeExternalSorterSuite/testOOMDuringSpill/):
 7ms, I guess we're ok than :sunglasses: 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19211: [SPARK-18838][core] Add separate listener queues ...

2017-09-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19211#discussion_r139594245
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---
@@ -42,59 +44,65 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
   private val mockSparkContext: SparkContext = 
Mockito.mock(classOf[SparkContext])
   private val mockMetricsSystem: MetricsSystem = 
Mockito.mock(classOf[MetricsSystem])
 
+  private def numDroppedEvents(bus: LiveListenerBus): Long = {
+
bus.metrics.metricRegistry.counter(s"queue.$SHARED_QUEUE.numDroppedEvents").getCount
+  }
+
+  private def queueSize(bus: LiveListenerBus): Int = {
+
bus.metrics.metricRegistry.getGauges().get(s"queue.$SHARED_QUEUE.size").getValue()
+  .asInstanceOf[Int]
+  }
+
+  private def eventProcessingTimeCount(bus: LiveListenerBus): Long = {
+
bus.metrics.metricRegistry.timer(s"queue.$SHARED_QUEUE.listenerProcessingTime").getCount()
+  }
+
   test("don't call sc.stop in listener") {
 sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
 val listener = new SparkContextStoppingListener(sc)
-val bus = new LiveListenerBus(sc.conf)
-bus.addListener(listener)
 
-// Starting listener bus should flush all buffered events
-bus.start(sc, sc.env.metricsSystem)
-bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
-bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+sc.listenerBus.addToSharedQueue(listener)
+sc.listenerBus.post(SparkListenerJobEnd(0, jobCompletionTime, 
JobSucceeded))
+sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+sc.stop()
 
-bus.stop()
 assert(listener.sparkExSeen)
   }
 
   test("basic creation and shutdown of LiveListenerBus") {
 val conf = new SparkConf()
 val counter = new BasicJobCounter
 val bus = new LiveListenerBus(conf)
-bus.addListener(counter)
+bus.addToSharedQueue(counter)
 
 // Metrics are initially empty.
 assert(bus.metrics.numEventsPosted.getCount === 0)
-assert(bus.metrics.numDroppedEvents.getCount === 0)
-assert(bus.metrics.queueSize.getValue === 0)
-assert(bus.metrics.eventProcessingTime.getCount === 0)
+assert(numDroppedEvents(bus) === 0)
+assert(queueSize(bus) === 0)
+assert(eventProcessingTimeCount(bus) === 0)
 
 // Post five events:
 (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, 
jobCompletionTime, JobSucceeded)) }
 
 // Five messages should be marked as received and queued, but no 
messages should be posted to
 // listeners yet because the the listener bus hasn't been started.
 assert(bus.metrics.numEventsPosted.getCount === 5)
-assert(bus.metrics.queueSize.getValue === 5)
+assert(queueSize(bus) === 5)
 assert(counter.count === 0)
 
 // Starting listener bus should flush all buffered events
 bus.start(mockSparkContext, mockMetricsSystem)
 Mockito.verify(mockMetricsSystem).registerSource(bus.metrics)
 bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
 assert(counter.count === 5)
-assert(bus.metrics.queueSize.getValue === 0)
-assert(bus.metrics.eventProcessingTime.getCount === 5)
+assert(queueSize(bus) === 0)
+assert(eventProcessingTimeCount(bus) === 5)
 
 // After listener bus has stopped, posting events should not increment 
counter
 bus.stop()
 (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, 
jobCompletionTime, JobSucceeded)) }
 assert(counter.count === 5)
-assert(bus.metrics.numEventsPosted.getCount === 5)
-
-// Make sure per-listener-class timers were created:
--- End diff --

do we still have this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15544: [SPARK-17997] [SQL] Add an aggregation function for coun...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15544
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15544: [SPARK-17997] [SQL] Add an aggregation function for coun...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15544
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81904/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15544: [SPARK-17997] [SQL] Add an aggregation function for coun...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15544
  
**[Test build #81904 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81904/testReport)**
 for PR 15544 at commit 
[`0e9a8dd`](https://github.com/apache/spark/commit/0e9a8dd4004c3331271105e4e9a3df963c3bc84b).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19145: [spark-21933][yarn] Spark Streaming request more executo...

2017-09-18 Thread klion26
Github user klion26 commented on the issue:

https://github.com/apache/spark/pull/19145
  
We enabled RM and NM recovery.

If we assume there are 2 containers running on this NM, after 10 minute, RM 
detects the failure of NM and relaunches 2 lost containers in other NMs. This 
is ok. 

But if we restart the RM, then, the lost containers in the NM will be 
**reported to RM as lost again** because of recovery, we will relaunch 2 more 
containers in other NMs, then we will get 2 more executors than we expected.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19211: [SPARK-18838][core] Add separate listener queues ...

2017-09-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19211#discussion_r139593321
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -173,80 +159,62 @@ private[spark] class LiveListenerBus(conf: SparkConf) 
extends SparkListenerBus {
*/
   @throws(classOf[TimeoutException])
   def waitUntilEmpty(timeoutMillis: Long): Unit = {
-val finishTime = System.currentTimeMillis + timeoutMillis
-while (!queueIsEmpty) {
-  if (System.currentTimeMillis > finishTime) {
-throw new TimeoutException(
-  s"The event queue is not empty after $timeoutMillis 
milliseconds")
+val deadline = System.currentTimeMillis + timeoutMillis
+queues.asScala.foreach { queue =>
+  if (!queue.waitUntilEmpty(deadline)) {
+throw new TimeoutException(s"The event queue is not empty after 
$timeoutMillis ms.")
   }
-  /* Sleep rather than using wait/notify, because this is used only 
for testing and
-   * wait/notify add overhead in the general case. */
-  Thread.sleep(10)
 }
   }
 
   /**
-   * For testing only. Return whether the listener daemon thread is still 
alive.
-   * Exposed for testing.
-   */
-  def listenerThreadIsAlive: Boolean = listenerThread.isAlive
-
-  /**
-   * Return whether the event queue is empty.
-   *
-   * The use of synchronized here guarantees that all events that once 
belonged to this queue
-   * have already been processed by all attached listeners, if this 
returns true.
-   */
-  private def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty && 
!processingEvent }
-
-  /**
* Stop the listener bus. It will wait until the queued events have been 
processed, but drop the
* new events after stopping.
*/
   def stop(): Unit = {
 if (!started.get()) {
-  throw new IllegalStateException(s"Attempted to stop $name that has 
not yet started!")
+  throw new IllegalStateException(s"Attempted to stop bus that has not 
yet started!")
 }
-if (stopped.compareAndSet(false, true)) {
-  // Call eventLock.release() so that listenerThread will poll `null` 
from `eventQueue` and know
-  // `stop` is called.
-  eventLock.release()
-  listenerThread.join()
-} else {
-  // Keep quiet
+
+if (!stopped.compareAndSet(false, true)) {
+  return
 }
-  }
 
-  /**
-   * If the event queue exceeds its capacity, the new events will be 
dropped. The subclasses will be
-   * notified with the dropped events.
-   *
-   * Note: `onDropEvent` can be called in any thread.
-   */
-  def onDropEvent(event: SparkListenerEvent): Unit = {
-metrics.numDroppedEvents.inc()
-droppedEventsCounter.incrementAndGet()
-if (logDroppedEvent.compareAndSet(false, true)) {
-  // Only log the following message once to avoid duplicated annoying 
logs.
-  logError("Dropping SparkListenerEvent because no remaining room in 
event queue. " +
-"This likely means one of the SparkListeners is too slow and 
cannot keep up with " +
-"the rate at which tasks are being started by the scheduler.")
+synchronized {
+  queues.asScala.foreach(_.stop())
+  queues.clear()
 }
-logTrace(s"Dropping event $event")
   }
+
+  private[spark] def findListenersByClass[T <: SparkListenerInterface : 
ClassTag](): Seq[T] = {
+queues.asScala.flatMap { queue => queue.findListenersByClass[T]() }
+  }
+
+  private[spark] def listeners: JList[SparkListenerInterface] = {
+queues.asScala.flatMap(_.listeners.asScala).asJava
+  }
+
+  // For testing only.
+  private[scheduler] def activeQueues(): Set[String] = {
+queues.asScala.map(_.name).toSet
+  }
+
 }
 
 private[spark] object LiveListenerBus {
   // Allows for Context to check whether stop() call is made within 
listener thread
   val withinListenerThread: DynamicVariable[Boolean] = new 
DynamicVariable[Boolean](false)
 
-  /** The thread name of Spark listener bus */
-  val name = "SparkListenerBus"
+  private[scheduler] val SHARED_QUEUE = "shared"
+
+  private[scheduler] val APP_STATUS_QUEUE = "appStatus"
+
+  private[scheduler] val EXECUTOR_MGMT_QUEUE = "executorMgmt"
--- End diff --

`MGMT` is hard to understand, how about `MANAGEMENT`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19211: [SPARK-18838][core] Add separate listener queues ...

2017-09-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19211#discussion_r139593217
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -173,80 +159,62 @@ private[spark] class LiveListenerBus(conf: SparkConf) 
extends SparkListenerBus {
*/
   @throws(classOf[TimeoutException])
   def waitUntilEmpty(timeoutMillis: Long): Unit = {
-val finishTime = System.currentTimeMillis + timeoutMillis
-while (!queueIsEmpty) {
-  if (System.currentTimeMillis > finishTime) {
-throw new TimeoutException(
-  s"The event queue is not empty after $timeoutMillis 
milliseconds")
+val deadline = System.currentTimeMillis + timeoutMillis
+queues.asScala.foreach { queue =>
+  if (!queue.waitUntilEmpty(deadline)) {
+throw new TimeoutException(s"The event queue is not empty after 
$timeoutMillis ms.")
   }
-  /* Sleep rather than using wait/notify, because this is used only 
for testing and
-   * wait/notify add overhead in the general case. */
-  Thread.sleep(10)
 }
   }
 
   /**
-   * For testing only. Return whether the listener daemon thread is still 
alive.
-   * Exposed for testing.
-   */
-  def listenerThreadIsAlive: Boolean = listenerThread.isAlive
-
-  /**
-   * Return whether the event queue is empty.
-   *
-   * The use of synchronized here guarantees that all events that once 
belonged to this queue
-   * have already been processed by all attached listeners, if this 
returns true.
-   */
-  private def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty && 
!processingEvent }
-
-  /**
* Stop the listener bus. It will wait until the queued events have been 
processed, but drop the
* new events after stopping.
*/
   def stop(): Unit = {
 if (!started.get()) {
-  throw new IllegalStateException(s"Attempted to stop $name that has 
not yet started!")
+  throw new IllegalStateException(s"Attempted to stop bus that has not 
yet started!")
 }
-if (stopped.compareAndSet(false, true)) {
-  // Call eventLock.release() so that listenerThread will poll `null` 
from `eventQueue` and know
-  // `stop` is called.
-  eventLock.release()
-  listenerThread.join()
-} else {
-  // Keep quiet
+
+if (!stopped.compareAndSet(false, true)) {
+  return
 }
-  }
 
-  /**
-   * If the event queue exceeds its capacity, the new events will be 
dropped. The subclasses will be
-   * notified with the dropped events.
-   *
-   * Note: `onDropEvent` can be called in any thread.
-   */
-  def onDropEvent(event: SparkListenerEvent): Unit = {
-metrics.numDroppedEvents.inc()
-droppedEventsCounter.incrementAndGet()
-if (logDroppedEvent.compareAndSet(false, true)) {
-  // Only log the following message once to avoid duplicated annoying 
logs.
-  logError("Dropping SparkListenerEvent because no remaining room in 
event queue. " +
-"This likely means one of the SparkListeners is too slow and 
cannot keep up with " +
-"the rate at which tasks are being started by the scheduler.")
+synchronized {
+  queues.asScala.foreach(_.stop())
+  queues.clear()
 }
-logTrace(s"Dropping event $event")
   }
+
+  private[spark] def findListenersByClass[T <: SparkListenerInterface : 
ClassTag](): Seq[T] = {
--- End diff --

let's add a comment to say it testing only.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19211: [SPARK-18838][core] Add separate listener queues ...

2017-09-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19211#discussion_r139593157
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -173,80 +159,62 @@ private[spark] class LiveListenerBus(conf: SparkConf) 
extends SparkListenerBus {
*/
   @throws(classOf[TimeoutException])
   def waitUntilEmpty(timeoutMillis: Long): Unit = {
-val finishTime = System.currentTimeMillis + timeoutMillis
-while (!queueIsEmpty) {
-  if (System.currentTimeMillis > finishTime) {
-throw new TimeoutException(
-  s"The event queue is not empty after $timeoutMillis 
milliseconds")
+val deadline = System.currentTimeMillis + timeoutMillis
+queues.asScala.foreach { queue =>
+  if (!queue.waitUntilEmpty(deadline)) {
+throw new TimeoutException(s"The event queue is not empty after 
$timeoutMillis ms.")
   }
-  /* Sleep rather than using wait/notify, because this is used only 
for testing and
-   * wait/notify add overhead in the general case. */
-  Thread.sleep(10)
 }
   }
 
   /**
-   * For testing only. Return whether the listener daemon thread is still 
alive.
-   * Exposed for testing.
-   */
-  def listenerThreadIsAlive: Boolean = listenerThread.isAlive
-
-  /**
-   * Return whether the event queue is empty.
-   *
-   * The use of synchronized here guarantees that all events that once 
belonged to this queue
-   * have already been processed by all attached listeners, if this 
returns true.
-   */
-  private def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty && 
!processingEvent }
-
-  /**
* Stop the listener bus. It will wait until the queued events have been 
processed, but drop the
* new events after stopping.
*/
   def stop(): Unit = {
 if (!started.get()) {
-  throw new IllegalStateException(s"Attempted to stop $name that has 
not yet started!")
+  throw new IllegalStateException(s"Attempted to stop bus that has not 
yet started!")
 }
-if (stopped.compareAndSet(false, true)) {
-  // Call eventLock.release() so that listenerThread will poll `null` 
from `eventQueue` and know
-  // `stop` is called.
-  eventLock.release()
-  listenerThread.join()
-} else {
-  // Keep quiet
+
+if (!stopped.compareAndSet(false, true)) {
+  return
 }
-  }
 
-  /**
-   * If the event queue exceeds its capacity, the new events will be 
dropped. The subclasses will be
-   * notified with the dropped events.
-   *
-   * Note: `onDropEvent` can be called in any thread.
-   */
-  def onDropEvent(event: SparkListenerEvent): Unit = {
-metrics.numDroppedEvents.inc()
-droppedEventsCounter.incrementAndGet()
-if (logDroppedEvent.compareAndSet(false, true)) {
-  // Only log the following message once to avoid duplicated annoying 
logs.
-  logError("Dropping SparkListenerEvent because no remaining room in 
event queue. " +
-"This likely means one of the SparkListeners is too slow and 
cannot keep up with " +
-"the rate at which tasks are being started by the scheduler.")
+synchronized {
+  queues.asScala.foreach(_.stop())
+  queues.clear()
 }
-logTrace(s"Dropping event $event")
   }
+
+  private[spark] def findListenersByClass[T <: SparkListenerInterface : 
ClassTag](): Seq[T] = {
+queues.asScala.flatMap { queue => queue.findListenersByClass[T]() }
+  }
+
+  private[spark] def listeners: JList[SparkListenerInterface] = {
--- End diff --

where do we use this method?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19265: [SPARK-22047][flaky test] HiveExternalCatalogVers...

2017-09-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19265


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19265: [SPARK-22047][flaky test] HiveExternalCatalogVersionsSui...

2017-09-18 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19265
  
looks very stable, I'm merging it to see if it works.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139592201
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2142,18 +2159,26 @@ def udf(f=None, returnType=StringType()):
 | 8|  JOHN DOE|  22|
 +--+--++
 """
-def _udf(f, returnType=StringType()):
-udf_obj = UserDefinedFunction(f, returnType)
-return udf_obj._wrapped()
+return _create_udf(f, returnType=returnType, vectorized=False)
 
-# decorator @udf, @udf() or @udf(dataType())
-if f is None or isinstance(f, (str, DataType)):
-# If DataType has been passed as a positional argument
-# for decorator use it as a returnType
-return_type = f or returnType
-return functools.partial(_udf, returnType=return_type)
+
+@since(2.3)
+def pandas_udf(f=None, returnType=StringType()):
+"""
+Creates a :class:`Column` expression representing a user defined 
function (UDF) that accepts
+`Pandas.Series` as input arguments and outputs a `Pandas.Series` of 
the same length.
+
+:param f: python function if used as a standalone function
+:param returnType: a :class:`pyspark.sql.types.DataType` object
+
+# TODO: doctest
+"""
+import inspect
+# If function "f" does not define the optional kwargs, then wrap with 
a kwargs placeholder
+if inspect.getargspec(f).keywords is None:
+return _create_udf(lambda *a, **kwargs: f(*a), 
returnType=returnType, vectorized=True)
--- End diff --

How about disallowing it for now? I think it could be an option if 
0-parameter UDF alone should not be supported consistently. `return 
pd.Series(1).repeat(kwargs['length'])` looks still a little bit weird ..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19273: Revert "[SPARK-21428] Turn IsolatedClientLoader off whil...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19273
  
**[Test build #81908 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81908/testReport)**
 for PR 19273 at commit 
[`72c5a8e`](https://github.com/apache/spark/commit/72c5a8e1474915422f154ac29c9dbe472081b3d6).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19273: Revert "[SPARK-21428] Turn IsolatedClientLoader o...

2017-09-18 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/19273

Revert "[SPARK-21428] Turn IsolatedClientLoader off while using builtin 
Hive jars for reusing CliSessionState

This reverts commit b83b502c4189c571bda776511c6f7541c6067aae.

It has a regression as reported in 
https://github.com/apache/spark/pull/19068#issuecomment-328260611 , seems the 
fix is not trivial, we should revert this commit first.

This revert has some conflicts, so try to go through jenkins to avoid to 
mess up something.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark revert

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19273.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19273


commit 72c5a8e1474915422f154ac29c9dbe472081b3d6
Author: Wenchen Fan 
Date:   2017-09-19T03:46:11Z

Revert "[SPARK-21428] Turn IsolatedClientLoader off while using builtin 
Hive jars for reusing CliSessionState"

This reverts commit b83b502c4189c571bda776511c6f7541c6067aae.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19273: Revert "[SPARK-21428] Turn IsolatedClientLoader off whil...

2017-09-18 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19273
  
cc @yaooqinn @jiangxb1987 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19068: [SPARK-21428][SQL][FOLLOWUP]CliSessionState should point...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19068
  
**[Test build #81906 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81906/testReport)**
 for PR 19068 at commit 
[`b21fc72`](https://github.com/apache/spark/commit/b21fc7222c313eb570f788afb5b02a9a67c881a4).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19068: [SPARK-21428][SQL][FOLLOWUP]CliSessionState should point...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19068
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19068: [SPARK-21428][SQL][FOLLOWUP]CliSessionState should point...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19068
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81906/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19271: [SPARK-22053][SS] Stream-stream inner join in Append Mod...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19271
  
**[Test build #81907 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81907/testReport)**
 for PR 19271 at commit 
[`94b63fb`](https://github.com/apache/spark/commit/94b63fb4082c98ff4f2e67e0759e98f46a3537a5).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16656: [SPARK-18116][DStream] Report stream input inform...

2017-09-18 Thread uncleGen
Github user uncleGen closed the pull request at:

https://github.com/apache/spark/pull/16656


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19068: [SPARK-21428][SQL][FOLLOWUP]CliSessionState should point...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19068
  
**[Test build #81906 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81906/testReport)**
 for PR 19068 at commit 
[`b21fc72`](https://github.com/apache/spark/commit/b21fc7222c313eb570f788afb5b02a9a67c881a4).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19247: [Spark-21996][SQL] read files with space in name for str...

2017-09-18 Thread xysun
Github user xysun commented on the issue:

https://github.com/apache/spark/pull/19247
  
@joseph-torres @brkyvz @lw-lin  can you please take a look? (sorry for 
uninvited mentions but i just took the latest commits on `FileStreamSource`)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...

2017-09-18 Thread WeichenXu123
Github user WeichenXu123 commented on the issue:

https://github.com/apache/spark/pull/17819
  
@viirya Oh, I am not saying the compatibility against old version scala 
application. What I say is about new version `Bucketizer`, when spark user use 
java language(not scala language), call new added API (such as 
`setInputSplits`), it cannot work. i.e, the new api isn't java-compatible. for 
example:
```
public class JavaBucketizerExample {
public static void main(String[] args) {
Bucketizer bucketizer = new Bucketizer().setInputCols(new String[] 
{"features", "features2"});
   ...
   } 
}
```
The above java program (Note not scala program) do not work. It cannot even 
pass compiling.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19208: [SPARK-21087] [ML] CrossValidator, TrainValidatio...

2017-09-18 Thread smurching
Github user smurching commented on a diff in the pull request:

https://github.com/apache/spark/pull/19208#discussion_r139586600
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala ---
@@ -276,12 +315,32 @@ object TrainValidationSplitModel extends 
MLReadable[TrainValidationSplitModel] {
 
 ValidatorParams.validateParams(instance)
 
+protected var shouldPersistSubModels: Boolean = false
+
+/**
+ * Set option for persist sub models.
+ */
+@Since("2.3.0")
+def persistSubModels(persist: Boolean): this.type = {
+  shouldPersistSubModels = persist
+  this
+}
+
 override protected def saveImpl(path: String): Unit = {
   import org.json4s.JsonDSL._
-  val extraMetadata = "validationMetrics" -> 
instance.validationMetrics.toSeq
+  val extraMetadata = ("validationMetrics" -> 
instance.validationMetrics.toSeq) ~
+("shouldPersistSubModels" -> shouldPersistSubModels)
   ValidatorParams.saveImpl(path, instance, sc, Some(extraMetadata))
   val bestModelPath = new Path(path, "bestModel").toString
   instance.bestModel.asInstanceOf[MLWritable].save(bestModelPath)
+  if (shouldPersistSubModels) {
+require(instance.subModels != null, "Cannot get sub models to 
persist.")
+val subModelsPath = new Path(path, "subModels")
+for (paramIndex <- 0 until instance.getEstimatorParamMaps.length) {
+  val modelPath = new Path(subModelsPath, 
paramIndex.toString).toString
+  
instance.subModels(paramIndex).asInstanceOf[MLWritable].save(modelPath)
--- End diff --

@WeichenXu123 Actually I don't think we have to worry about this; Pipeline 
persistence doesn't clean up if a stage fails to persist (see 
[Pipeline.scala](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala#L242))


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139585713
  
--- Diff: python/pyspark/serializers.py ---
@@ -199,6 +211,46 @@ def __repr__(self):
 return "ArrowSerializer"
 
 
+class ArrowPandasSerializer(ArrowSerializer):
+"""
+Serializes Pandas.Series as Arrow data.
+"""
+
+def __init__(self):
+super(ArrowPandasSerializer, self).__init__()
+
+def dumps(self, series):
+"""
+Make an ArrowRecordBatch from a Pandas Series and serialize. Input 
is a single series or
+a list of series accompanied by an optional pyarrow type to coerce 
the data to.
+"""
+import pyarrow as pa
+# Make input conform to [(series1, type1), (series2, type2), ...]
+if not isinstance(series, (list, tuple)) or \
+(len(series) == 2 and isinstance(series[1], pa.DataType)):
+series = [series]
+series = [(s, None) if not isinstance(s, (list, tuple)) else s for 
s in series]
+arrs = [pa.Array.from_pandas(s[0], type=s[1], mask=s[0].isnull()) 
for s in series]
+batch = pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in 
range(len(arrs))])
+return super(ArrowPandasSerializer, self).dumps(batch)
+
+def loads(self, obj):
+"""
+Deserialize an ArrowRecordBatch to an Arrow table and return as a 
list of pandas.Series
+followed by a dictionary containing length of the loaded batches.
+"""
+import pyarrow as pa
+reader = pa.RecordBatchFileReader(pa.BufferReader(obj))
+batches = [reader.get_batch(i) for i in 
range(reader.num_record_batches)]
+# NOTE: a 0-parameter pandas_udf will produce an empty batch that 
can have num_rows set
+num_rows = sum([batch.num_rows for batch in batches])
--- End diff --

I'd use generator comprehension here too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139585787
  
--- Diff: python/pyspark/serializers.py ---
@@ -199,6 +211,46 @@ def __repr__(self):
 return "ArrowSerializer"
 
 
+class ArrowPandasSerializer(ArrowSerializer):
+"""
+Serializes Pandas.Series as Arrow data.
+"""
+
+def __init__(self):
+super(ArrowPandasSerializer, self).__init__()
+
+def dumps(self, series):
+"""
+Make an ArrowRecordBatch from a Pandas Series and serialize. Input 
is a single series or
+a list of series accompanied by an optional pyarrow type to coerce 
the data to.
+"""
+import pyarrow as pa
+# Make input conform to [(series1, type1), (series2, type2), ...]
+if not isinstance(series, (list, tuple)) or \
+(len(series) == 2 and isinstance(series[1], pa.DataType)):
+series = [series]
+series = [(s, None) if not isinstance(s, (list, tuple)) else s for 
s in series]
+arrs = [pa.Array.from_pandas(s[0], type=s[1], mask=s[0].isnull()) 
for s in series]
+batch = pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in 
range(len(arrs))])
+return super(ArrowPandasSerializer, self).dumps(batch)
+
+def loads(self, obj):
+"""
+Deserialize an ArrowRecordBatch to an Arrow table and return as a 
list of pandas.Series
+followed by a dictionary containing length of the loaded batches.
+"""
+import pyarrow as pa
+reader = pa.RecordBatchFileReader(pa.BufferReader(obj))
+batches = [reader.get_batch(i) for i in 
range(reader.num_record_batches)]
--- End diff --

And .. `xrange` here too


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139585473
  
--- Diff: python/pyspark/serializers.py ---
@@ -199,6 +211,46 @@ def __repr__(self):
 return "ArrowSerializer"
 
 
+class ArrowPandasSerializer(ArrowSerializer):
+"""
+Serializes Pandas.Series as Arrow data.
+"""
+
+def __init__(self):
+super(ArrowPandasSerializer, self).__init__()
+
+def dumps(self, series):
+"""
+Make an ArrowRecordBatch from a Pandas Series and serialize. Input 
is a single series or
+a list of series accompanied by an optional pyarrow type to coerce 
the data to.
+"""
+import pyarrow as pa
+# Make input conform to [(series1, type1), (series2, type2), ...]
+if not isinstance(series, (list, tuple)) or \
+(len(series) == 2 and isinstance(series[1], pa.DataType)):
+series = [series]
+series = [(s, None) if not isinstance(s, (list, tuple)) else s for 
s in series]
--- End diff --

I'd use generator comprehension.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139585376
  
--- Diff: python/pyspark/serializers.py ---
@@ -199,6 +211,46 @@ def __repr__(self):
 return "ArrowSerializer"
 
 
+class ArrowPandasSerializer(ArrowSerializer):
+"""
+Serializes Pandas.Series as Arrow data.
+"""
+
+def __init__(self):
+super(ArrowPandasSerializer, self).__init__()
+
+def dumps(self, series):
+"""
+Make an ArrowRecordBatch from a Pandas Series and serialize. Input 
is a single series or
+a list of series accompanied by an optional pyarrow type to coerce 
the data to.
+"""
+import pyarrow as pa
+# Make input conform to [(series1, type1), (series2, type2), ...]
+if not isinstance(series, (list, tuple)) or \
+(len(series) == 2 and isinstance(series[1], pa.DataType)):
+series = [series]
+series = [(s, None) if not isinstance(s, (list, tuple)) else s for 
s in series]
+arrs = [pa.Array.from_pandas(s[0], type=s[1], mask=s[0].isnull()) 
for s in series]
+batch = pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in 
range(len(arrs))])
--- End diff --

I'd use `xrange`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs

2017-09-18 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/18659
  
@BryanCutler I'm ok to upgrade pyarrow to 0.7 except for the same concerns 
as #18974.
I guess we need to discuss upgrade policy and strategy of pyarrow.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19265: [SPARK-22047][flaky test] HiveExternalCatalogVersionsSui...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19265
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19265: [SPARK-22047][flaky test] HiveExternalCatalogVersionsSui...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19265
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81902/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19265: [SPARK-22047][flaky test] HiveExternalCatalogVersionsSui...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19265
  
**[Test build #81902 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81902/testReport)**
 for PR 19265 at commit 
[`d395780`](https://github.com/apache/spark/commit/d3957809ad2bf720686fe6068b2cb6d530bf6845).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19130: [SPARK-21917][CORE][YARN] Supporting adding http(s) reso...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19130
  
**[Test build #81905 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81905/testReport)**
 for PR 19130 at commit 
[`580d587`](https://github.com/apache/spark/commit/580d5872bfa46c7c7b2d37ce6c4cf4568dc32050).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19265: [SPARK-22047][flaky test] HiveExternalCatalogVersionsSui...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19265
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19265: [SPARK-22047][flaky test] HiveExternalCatalogVersionsSui...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19265
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81901/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19265: [SPARK-22047][flaky test] HiveExternalCatalogVersionsSui...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19265
  
**[Test build #81901 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81901/testReport)**
 for PR 19265 at commit 
[`d395780`](https://github.com/apache/spark/commit/d3957809ad2bf720686fe6068b2cb6d530bf6845).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139583530
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3122,6 +3122,185 @@ def test_filtered_frame(self):
 self.assertTrue(pdf.empty)
 
 
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
+class VectorizedUDFTests(ReusedPySparkTestCase):
+
+@classmethod
+def setUpClass(cls):
+ReusedPySparkTestCase.setUpClass()
+cls.spark = SparkSession(cls.sc)
+
+@classmethod
+def tearDownClass(cls):
+ReusedPySparkTestCase.tearDownClass()
+cls.spark.stop()
+
+def test_vectorized_udf_basic(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.range(10).select(
+col('id').cast('string').alias('str'),
+col('id').cast('int').alias('int'),
+col('id').alias('long'),
+col('id').cast('float').alias('float'),
+col('id').cast('double').alias('double'),
+col('id').cast('boolean').alias('bool'))
+f = lambda x: x
+str_f = pandas_udf(f, StringType())
+int_f = pandas_udf(f, IntegerType())
+long_f = pandas_udf(f, LongType())
+float_f = pandas_udf(f, FloatType())
+double_f = pandas_udf(f, DoubleType())
+bool_f = pandas_udf(f, BooleanType())
+res = df.select(str_f(col('str')), int_f(col('int')),
+long_f(col('long')), float_f(col('float')),
+double_f(col('double')), bool_f(col('bool')))
+self.assertEquals(df.collect(), res.collect())
+
+def test_vectorized_udf_null_boolean(self):
+from pyspark.sql.functions import pandas_udf, col
+data = [(True,), (True,), (None,), (False,)]
+schema = StructType().add("bool", BooleanType())
+df = self.spark.createDataFrame(data, schema)
+bool_f = pandas_udf(lambda x: x, BooleanType())
+res = df.select(bool_f(col('bool')))
+self.assertEquals(df.collect(), res.collect())
+
+def test_vectorized_udf_null_byte(self):
+from pyspark.sql.functions import pandas_udf, col
+data = [(None,), (2,), (3,), (4,)]
+schema = StructType().add("byte", ByteType())
+df = self.spark.createDataFrame(data, schema)
+byte_f = pandas_udf(lambda x: x, ByteType())
+res = df.select(byte_f(col('byte')))
+self.assertEquals(df.collect(), res.collect())
+
+def test_vectorized_udf_null_short(self):
+from pyspark.sql.functions import pandas_udf, col
+data = [(None,), (2,), (3,), (4,)]
+schema = StructType().add("short", ShortType())
+df = self.spark.createDataFrame(data, schema)
+short_f = pandas_udf(lambda x: x, ShortType())
+res = df.select(short_f(col('short')))
+self.assertEquals(df.collect(), res.collect())
+
+def test_vectorized_udf_null_int(self):
+from pyspark.sql.functions import pandas_udf, col
+data = [(None,), (2,), (3,), (4,)]
+schema = StructType().add("int", IntegerType())
+df = self.spark.createDataFrame(data, schema)
+int_f = pandas_udf(lambda x: x, IntegerType())
+res = df.select(int_f(col('int')))
+self.assertEquals(df.collect(), res.collect())
+
+def test_vectorized_udf_null_long(self):
+from pyspark.sql.functions import pandas_udf, col
+data = [(None,), (2,), (3,), (4,)]
+schema = StructType().add("long", LongType())
+df = self.spark.createDataFrame(data, schema)
+long_f = pandas_udf(lambda x: x, LongType())
+res = df.select(long_f(col('long')))
+self.assertEquals(df.collect(), res.collect())
+
+def test_vectorized_udf_null_float(self):
+from pyspark.sql.functions import pandas_udf, col
+data = [(3.0,), (5.0,), (-1.0,), (None,)]
+schema = StructType().add("float", FloatType())
+df = self.spark.createDataFrame(data, schema)
+float_f = pandas_udf(lambda x: x, FloatType())
+res = df.select(float_f(col('float')))
+self.assertEquals(df.collect(), res.collect())
+
+def test_vectorized_udf_null_double(self):
+from pyspark.sql.functions import pandas_udf, col
+data = [(3.0,), (5.0,), (-1.0,), (None,)]
+schema = StructType().add("double", DoubleType())
+df = self.spark.createDataFrame(data, schema)
+double_f = pandas_udf(lambda x: x, DoubleType())
+res = df.select(double_f(col('double')))

[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139580569
  
--- Diff: python/pyspark/worker.py ---
@@ -71,7 +73,19 @@ def wrap_udf(f, return_type):
 return lambda *a: f(*a)
 
 
-def read_single_udf(pickleSer, infile):
+def wrap_pandas_udf(f, return_type):
+def verify_result_length(*a):
+kwargs = a[-1]
+result = f(*a[:-1], **kwargs)
+if len(result) != kwargs["length"]:
+raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
+   "expected %d, got %d\nUse input vector 
length or kwarg['length']"
+   % (kwargs["length"], len(result)))
+return result, toArrowType(return_type)
--- End diff --

Can we move `toArrowType(return_type)` out of `verify_result_length` to 
avoid calculating it for each block?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139579800
  
--- Diff: python/pyspark/worker.py ---
@@ -71,7 +73,19 @@ def wrap_udf(f, return_type):
 return lambda *a: f(*a)
 
 
-def read_single_udf(pickleSer, infile):
+def wrap_pandas_udf(f, return_type):
+def verify_result_length(*a):
+kwargs = a[-1]
+result = f(*a[:-1], **kwargs)
+if len(result) != kwargs["length"]:
+raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
+   "expected %d, got %d\nUse input vector 
length or kwarg['length']"
--- End diff --

typo: `kwarg['length']` -> `kwargs['length']`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19243: [SPARK-21780][R] Simpler Dataset.sample API in R

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19243
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19243: [SPARK-21780][R] Simpler Dataset.sample API in R

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19243
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81903/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19243: [SPARK-21780][R] Simpler Dataset.sample API in R

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19243
  
**[Test build #81903 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81903/testReport)**
 for PR 19243 at commit 
[`287f2b4`](https://github.com/apache/spark/commit/287f2b4c0d956de145526402466a0818b2d3e15c).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19210: [SPARK-22030][CORE] GraphiteSink fails to re-conn...

2017-09-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19210


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19266: [SPARK-22033][CORE] BufferHolder, other size chec...

2017-09-18 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19266#discussion_r139582421
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java ---
@@ -39,7 +39,7 @@
   private final long length;
 
   public LongArray(MemoryBlock memory) {
-assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size > 4 
billion elements";
+assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size > 2.1 
billion elements";
--- End diff --

We need the same assert below?
 
https://github.com/apache/spark/blob/c66d64b3df9d9ffba0b16a62015680f6f876fc68/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java#L53


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19210: [SPARK-22030][CORE] GraphiteSink fails to re-connect to ...

2017-09-18 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19210
  
LGTM, merging to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...

2017-09-18 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/17819
  
Sorry I have to reply on a phone, so I may not write codes smoothly.

What I mean it doesn't break binary compatibility, is the existing users
codes using Bucketizer don't need to recompile. If you want to use new
inputCols stuff, of course you need to recompile your codes. I think this
is what about binary compatibility, isn't?



On Sep 19, 2017 9:13 AM, "WeichenXu"  wrote:

@viirya  Scala with trait is a complex mechanism
and trait isn't equivalent to java's interface. Scala compiler will
precompile and generate many other codes, so java-side code cannot directly
call methods in with trait. In order to test this, you can modify this file
and add a line:

--- 
a/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketizerExample.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketizerExample.java
@@ -59,7 +59,8 @@ public class JavaBucketizerExample {
 Bucketizer bucketizer = new Bucketizer()
   .setInputCol("features")
   .setOutputCol("bucketedFeatures")
-  .setSplits(splits);
+  .setSplits(splits)
+  .setInputCols(new String[] {"features", "features2"});

 // Transform original data into its bucket index.
 Dataset bucketedData = bucketizer.transform(dataFrame);

and you will find this Java class compile failed.

—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
, or mute
the thread


.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19266: [SPARK-22033][CORE] BufferHolder, other size chec...

2017-09-18 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19266#discussion_r139580567
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java ---
@@ -39,7 +39,7 @@
   private final long length;
 
   public LongArray(MemoryBlock memory) {
-assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size > 4 
billion elements";
+assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size > 2.1 
billion elements";
--- End diff --

ISTM `memory.size() < (long) Integer.MAX_VALUE * 8` is correct? see: 
https://github.com/apache/spark/blob/c66d64b3df9d9ffba0b16a62015680f6f876fc68/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java#L54


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18945: Add option to convert nullable int columns to float colu...

2017-09-18 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/18945
  
gentle ping @logannc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19208: [SPARK-21087] [ML] CrossValidator, TrainValidationSplit ...

2017-09-18 Thread WeichenXu123
Github user WeichenXu123 commented on the issue:

https://github.com/apache/spark/pull/19208
  
@smurching Thanks! I will update later. And note that I will separate part 
of this PR to a new PR (the separated part will be a bugfix for #16774 )


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19271: [SPARK-22053][SS] Stream-stream inner join in Append Mod...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19271
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81900/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19271: [SPARK-22053][SS] Stream-stream inner join in Append Mod...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19271
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19271: [SPARK-22053][SS] Stream-stream inner join in Append Mod...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19271
  
**[Test build #81900 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81900/testReport)**
 for PR 19271 at commit 
[`9edaa58`](https://github.com/apache/spark/commit/9edaa580edc6e599b01033190a9ef383ed001ae9).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15544: [SPARK-17997] [SQL] Add an aggregation function for coun...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15544
  
**[Test build #81904 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81904/testReport)**
 for PR 15544 at commit 
[`0e9a8dd`](https://github.com/apache/spark/commit/0e9a8dd4004c3331271105e4e9a3df963c3bc84b).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compression.co...

2017-09-18 Thread fjh100456
Github user fjh100456 commented on the issue:

https://github.com/apache/spark/pull/19218
  
Thanks for your review. @gatorsmile 
In the first question I mean that ‘parquet.compression’ can be found in 
the `table: Tabledesc` (maybe similar with `catalogtable`), and can also be 
found in `sparkSession.sessionState.conf`(set by user through the command `set 
parquet.compression=xxx`), which one should take precedence?

This issue was originally only related to hive table writing,  but after 
fix the priority, it was found that non-partitioned tables did not take the 
right precedence, and non-partitioned tables writing will not enter 
`InsertIntoHiveTable`. `Insertintohadoopfsrelationcommand.scala` is really not 
a proper place, is there any place that can solve both of partitioned tables 
and non-partitioned tables?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15544: [SPARK-17997] [SQL] Add an aggregation function for coun...

2017-09-18 Thread wzhfy
Github user wzhfy commented on the issue:

https://github.com/apache/spark/pull/15544
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18659
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18659
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81899/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19208: [SPARK-21087] [ML] CrossValidator, TrainValidatio...

2017-09-18 Thread smurching
Github user smurching commented on a diff in the pull request:

https://github.com/apache/spark/pull/19208#discussion_r139578700
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala ---
@@ -276,12 +315,32 @@ object TrainValidationSplitModel extends 
MLReadable[TrainValidationSplitModel] {
 
 ValidatorParams.validateParams(instance)
 
+protected var shouldPersistSubModels: Boolean = false
+
+/**
+ * Set option for persist sub models.
+ */
+@Since("2.3.0")
+def persistSubModels(persist: Boolean): this.type = {
+  shouldPersistSubModels = persist
+  this
+}
+
 override protected def saveImpl(path: String): Unit = {
   import org.json4s.JsonDSL._
-  val extraMetadata = "validationMetrics" -> 
instance.validationMetrics.toSeq
+  val extraMetadata = ("validationMetrics" -> 
instance.validationMetrics.toSeq) ~
+("shouldPersistSubModels" -> shouldPersistSubModels)
   ValidatorParams.saveImpl(path, instance, sc, Some(extraMetadata))
   val bestModelPath = new Path(path, "bestModel").toString
   instance.bestModel.asInstanceOf[MLWritable].save(bestModelPath)
+  if (shouldPersistSubModels) {
+require(instance.subModels != null, "Cannot get sub models to 
persist.")
+val subModelsPath = new Path(path, "subModels")
+for (paramIndex <- 0 until instance.getEstimatorParamMaps.length) {
+  val modelPath = new Path(subModelsPath, 
paramIndex.toString).toString
+  
instance.subModels(paramIndex).asInstanceOf[MLWritable].save(modelPath)
--- End diff --

Should we clean up/remove the partially-persisted `subModels` if any of 
these `save()` calls fail? E.g. let's say we have four subModels and the first 
three `save()` calls succeed but the fourth fails - should we delete the 
folders for the first three submodels?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19208: [SPARK-21087] [ML] CrossValidator, TrainValidatio...

2017-09-18 Thread smurching
Github user smurching commented on a diff in the pull request:

https://github.com/apache/spark/pull/19208#discussion_r139573779
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala ---
@@ -261,17 +290,40 @@ class CrossValidatorModel private[ml] (
 val copied = new CrossValidatorModel(
   uid,
   bestModel.copy(extra).asInstanceOf[Model[_]],
-  avgMetrics.clone())
+  avgMetrics.clone(),
+  CrossValidatorModel.copySubModels(subModels))
 copyValues(copied, extra).setParent(parent)
   }
 
   @Since("1.6.0")
   override def write: MLWriter = new 
CrossValidatorModel.CrossValidatorModelWriter(this)
+
+  @Since("2.3.0")
+  @throws[IOException]("If the input path already exists but overwrite is 
not enabled.")
+  def save(path: String, persistSubModels: Boolean): Unit = {
+write.asInstanceOf[CrossValidatorModel.CrossValidatorModelWriter]
+  .persistSubModels(persistSubModels).save(path)
+  }
--- End diff --

I think users can still access `CrossValidatorModelWriter` through 
`CrossValidatorModel.write`, so the `save` method is unnecessary. 

The `private[CrossValidatorModel]` annotation on the 
`CrossValidatorModelWriter` constructor only means that users can't create 
instances of the class e.g. via `new 
CrossValidatorModel.CrossValidatorModelWriter(...)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18659
  
**[Test build #81899 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81899/testReport)**
 for PR 18659 at commit 
[`d49a3db`](https://github.com/apache/spark/commit/d49a3dbbe4e5952e73259309424c201537971bb9).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19208: [SPARK-21087] [ML] CrossValidator, TrainValidatio...

2017-09-18 Thread smurching
Github user smurching commented on a diff in the pull request:

https://github.com/apache/spark/pull/19208#discussion_r139556318
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala 
---
@@ -82,7 +82,10 @@ private[shared] object SharedParamsCodeGen {
 "all instance weights as 1.0"),
   ParamDesc[String]("solver", "the solver algorithm for optimization", 
finalFields = false),
   ParamDesc[Int]("aggregationDepth", "suggested depth for 
treeAggregate (>= 2)", Some("2"),
-isValid = "ParamValidators.gtEq(2)", isExpertParam = true))
+isValid = "ParamValidators.gtEq(2)", isExpertParam = true),
+  ParamDesc[Boolean]("collectSubModels", "whether to collect sub 
models when tuning fitting",
--- End diff --

Suggestion: reword "whether to collect sub models when tuning fitting" --> 
"whether to collect a list of sub-models trained during tuning"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19208: [SPARK-21087] [ML] CrossValidator, TrainValidatio...

2017-09-18 Thread smurching
Github user smurching commented on a diff in the pull request:

https://github.com/apache/spark/pull/19208#discussion_r139568979
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala ---
@@ -237,12 +251,17 @@ object CrossValidator extends 
MLReadable[CrossValidator] {
 class CrossValidatorModel private[ml] (
 @Since("1.4.0") override val uid: String,
 @Since("1.2.0") val bestModel: Model[_],
-@Since("1.5.0") val avgMetrics: Array[Double])
+@Since("1.5.0") val avgMetrics: Array[Double],
+@Since("2.3.0") val subModels: Array[Array[Model[_]]])
   extends Model[CrossValidatorModel] with CrossValidatorParams with 
MLWritable {
 
   /** A Python-friendly auxiliary constructor. */
   private[ml] def this(uid: String, bestModel: Model[_], avgMetrics: 
JList[Double]) = {
-this(uid, bestModel, avgMetrics.asScala.toArray)
+this(uid, bestModel, avgMetrics.asScala.toArray, null)
--- End diff --

See earlier suggestion, use an Option set to `None` instead of setting the 
Array to null


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19208: [SPARK-21087] [ML] CrossValidator, TrainValidatio...

2017-09-18 Thread smurching
Github user smurching commented on a diff in the pull request:

https://github.com/apache/spark/pull/19208#discussion_r139557219
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala ---
@@ -117,6 +123,12 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") 
override val uid: String)
 instr.logParams(numFolds, seed, parallelism)
 logTuningParams(instr)
 
+val collectSubModelsParam = $(collectSubModels)
+
+var subModels: Array[Array[Model[_]]] = if (collectSubModelsParam) {
--- End diff --

Perhaps use an `Option[Array[Model[_]]]` instead of setting `subModels` to 
null?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19243: [SPARK-21780][R] Simpler Dataset.sample API in R

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19243
  
**[Test build #81903 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81903/testReport)**
 for PR 19243 at commit 
[`287f2b4`](https://github.com/apache/spark/commit/287f2b4c0d956de145526402466a0818b2d3e15c).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19074: [SPARK-21714][CORE][BACKPORT-2.2] Avoiding re-uploading ...

2017-09-18 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19074
  
@loneknightpy did you open a new JIRA about this issue?

AFAIK, downloading resources to local disk is not supported for cluster 
mode even from beginning, would you please elaborate the issue in JIRA, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r139578185
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/IncrementalExecutionRulesSuite.scala
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.UUID
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, 
UnaryExecNode}
+import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchange}
+import org.apache.spark.sql.execution.streaming.{IncrementalExecution, 
OffsetSeqMetadata, StatefulOperator, StatefulOperatorStateInfo}
+import org.apache.spark.sql.test.SharedSQLContext
+
+class IncrementalExecutionRulesSuite extends SparkPlanTest with 
SharedSQLContext {
+
+  import testImplicits._
+  super.beforeAll()
+
+  private val baseDf = Seq((1, "A"), (2, "b")).toDF("num", "char")
+
+  testEnsureStatefulOpPartitioning(
+"ClusteredDistribution generates Exchange with HashPartitioning",
+baseDf.queryExecution.sparkPlan,
+keys => ClusteredDistribution(keys),
+keys => HashPartitioning(keys, 
spark.sessionState.conf.numShufflePartitions),
+expectShuffle = true)
+
+  testEnsureStatefulOpPartitioning(
+"ClusteredDistribution with coalesce(1) generates Exchange with 
HashPartitioning",
+baseDf.coalesce(1).queryExecution.sparkPlan,
+keys => ClusteredDistribution(keys),
+keys => HashPartitioning(keys, 
spark.sessionState.conf.numShufflePartitions),
+expectShuffle = true)
+
+  testEnsureStatefulOpPartitioning(
+"AllTuples generates Exchange with SinglePartition",
+baseDf.queryExecution.sparkPlan,
+keys => AllTuples,
+keys => SinglePartition,
+expectShuffle = true)
+
+  testEnsureStatefulOpPartitioning(
+"AllTuples with coalesce(1) doesn't need Exchange",
+baseDf.coalesce(1).queryExecution.sparkPlan,
+keys => AllTuples,
--- End diff --

nit: add `requiredDistribution = ` and `expectedPartitioning = ` for 
greater readability


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r139578096
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/IncrementalExecutionRulesSuite.scala
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.UUID
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, 
UnaryExecNode}
+import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchange}
+import org.apache.spark.sql.execution.streaming.{IncrementalExecution, 
OffsetSeqMetadata, StatefulOperator, StatefulOperatorStateInfo}
+import org.apache.spark.sql.test.SharedSQLContext
+
+class IncrementalExecutionRulesSuite extends SparkPlanTest with 
SharedSQLContext {
+
+  import testImplicits._
+  super.beforeAll()
+
+  private val baseDf = Seq((1, "A"), (2, "b")).toDF("num", "char")
+
+  testEnsureStatefulOpPartitioning(
+"ClusteredDistribution generates Exchange with HashPartitioning",
+baseDf.queryExecution.sparkPlan,
+keys => ClusteredDistribution(keys),
+keys => HashPartitioning(keys, 
spark.sessionState.conf.numShufflePartitions),
+expectShuffle = true)
+
+  testEnsureStatefulOpPartitioning(
+"ClusteredDistribution with coalesce(1) generates Exchange with 
HashPartitioning",
+baseDf.coalesce(1).queryExecution.sparkPlan,
+keys => ClusteredDistribution(keys),
+keys => HashPartitioning(keys, 
spark.sessionState.conf.numShufflePartitions),
+expectShuffle = true)
+
+  testEnsureStatefulOpPartitioning(
+"AllTuples generates Exchange with SinglePartition",
+baseDf.queryExecution.sparkPlan,
+keys => AllTuples,
+keys => SinglePartition,
+expectShuffle = true)
+
+  testEnsureStatefulOpPartitioning(
+"AllTuples with coalesce(1) doesn't need Exchange",
+baseDf.coalesce(1).queryExecution.sparkPlan,
+keys => AllTuples,
+keys => SinglePartition,
+expectShuffle = false)
+
+  private def testEnsureStatefulOpPartitioning(
--- End diff --

nit: add some docs specifying what does it test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   >