[GitHub] spark pull request #21050: [SPARK-23912][SQL]add array_distinct

2018-04-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21050#discussion_r180971503
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -287,3 +287,29 @@ case class ArrayContains(left: Expression, right: 
Expression)
 
   override def prettyName: String = "array_contains"
 }
+
+/**
+ * Removes duplicate values from the array.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(array) - Removes duplicate values from the array.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3, null, 3));
+   [1,2,3,null]
+ """, since = "2.4.0")
--- End diff --

indentation .. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21050: [SPARK-23912][SQL]add array_distinct

2018-04-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21050#discussion_r180971591
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
 ---
@@ -105,4 +105,18 @@ class CollectionExpressionsSuite extends SparkFunSuite 
with ExpressionEvalHelper
 checkEvaluation(ArrayContains(a3, Literal("")), null)
 checkEvaluation(ArrayContains(a3, Literal.create(null, StringType)), 
null)
   }
+
+  test("Array Unique") {
+val a0 = Literal.create(Seq(2, 1, 2, 3, 4, 4, 5), 
ArrayType(IntegerType))
+val a1 = Literal.create(Seq[Integer](), ArrayType(IntegerType))
--- End diff --

`Seq.empty[Integer]`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20874: [SPARK-23763][SQL] OffHeapColumnVector uses MemoryBlock

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20874
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89231/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20874: [SPARK-23763][SQL] OffHeapColumnVector uses MemoryBlock

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20874
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20874: [SPARK-23763][SQL] OffHeapColumnVector uses MemoryBlock

2018-04-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20874
  
**[Test build #89231 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89231/testReport)**
 for PR 20874 at commit 
[`9ef19df`](https://github.com/apache/spark/commit/9ef19dfcde9dc84f494bff5f03a56db840741496).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20998: [SPARK-23888][CORE] speculative task should not run on a...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20998
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20998: [SPARK-23888][CORE] speculative task should not run on a...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20998
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89228/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20998: [SPARK-23888][CORE] speculative task should not run on a...

2018-04-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20998
  
**[Test build #89228 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89228/testReport)**
 for PR 20998 at commit 
[`5901728`](https://github.com/apache/spark/commit/5901728d6be8ad33e39d56006a2bc8cc02cfff38).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21046: [SPARK-23955][PYSPARK]Typo in classification.py

2018-04-11 Thread codeforfun15
Github user codeforfun15 closed the pull request at:

https://github.com/apache/spark/pull/21046


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21050: [SPARK-23912][SQL]add array_distinct

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21050
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2239/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21050: [SPARK-23912][SQL]add array_distinct

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21050
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21050: [SPARK-23912][SQL]add array_distinct

2018-04-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21050
  
**[Test build #89236 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89236/testReport)**
 for PR 21050 at commit 
[`3bfb23e`](https://github.com/apache/spark/commit/3bfb23ed6bfe0cefe9ed71f24678123bb57e3ce2).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21049: [SPARK-23957][SQL] Remove redundant sort operators from ...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21049
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89234/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21049: [SPARK-23957][SQL] Remove redundant sort operators from ...

2018-04-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21049
  
**[Test build #89234 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89234/testReport)**
 for PR 21049 at commit 
[`bb992c2`](https://github.com/apache/spark/commit/bb992c2058863322a9183b2985806a87729e4168).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21049: [SPARK-23957][SQL] Remove redundant sort operator...

2018-04-11 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21049#discussion_r180964957
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -307,6 +309,32 @@ object RemoveRedundantProject extends 
Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove [[Sort]] in subqueries that do not affect the set of rows 
produced, only their
+ * order. Subqueries produce unordered sets of rows so sorting their 
output is unnecessary.
+ */
+object RemoveSubquerySorts extends Rule[LogicalPlan] {
+
+  /**
+   * Removes all [[Sort]] operators from a plan that are accessible from 
the root operator via
+   * 0 or more [[Project]], [[Filter]] or [[View]] operators.
+   */
+  private def removeTopLevelSorts(plan: LogicalPlan): LogicalPlan = {
+plan match {
+  case Sort(_, _, child) => removeTopLevelSorts(child)
+  case Project(fields, child) => Project(fields, 
removeTopLevelSorts(child))
+  case Filter(condition, child) => Filter(condition, 
removeTopLevelSorts(child))
+  case View(tbl, output, child) => View(tbl, output, 
removeTopLevelSorts(child))
+  case _ => plan
+}
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case Subquery(child) => Subquery(removeTopLevelSorts(child))
+case SubqueryAlias(name, child) => SubqueryAlias(name, 
removeTopLevelSorts(child))
--- End diff --

Thanks! I've been trying to understand the role of `Subquery` and 
`SubqueryAlias`. My confusion is that subqueries do seem to get planned as 
`SubqueryAlias` operators, e.g.:

scala> spark.sql("SELECT count(*) from (SELECT id FROM dft ORDER BY 
id)").explain(true)
== Parsed Logical Plan ==
'Project [unresolvedalias('count(1), None)]
+- 'SubqueryAlias __auto_generated_subquery_name
   +- 'Sort ['id ASC NULLS FIRST], true
  +- 'Project ['id]
 +- 'UnresolvedRelation `dft`

In the example you give I (personally) think it's still reasonable to drop 
the ordering, but understand that might surprise some users. It wouldn't be 
hard to skip the root if it's a subquery - but what do you propose for 
detecting subqueries if my method isn't right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21049: [SPARK-23957][SQL] Remove redundant sort operators from ...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21049
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21050: [SPARK-23912][SQL]add array_distinct

2018-04-11 Thread huaxingao
GitHub user huaxingao opened a pull request:

https://github.com/apache/spark/pull/21050

[SPARK-23912][SQL]add array_distinct

## What changes were proposed in this pull request?

Add array_distinct to remove duplicate value from the array.

## How was this patch tested?

Add unit tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huaxingao/spark spark-23912

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21050.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21050


commit 3bfb23ed6bfe0cefe9ed71f24678123bb57e3ce2
Author: Huaxin Gao 
Date:   2018-04-12T04:52:25Z

[SPARK-23912][SQL]add array_distinct




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21009: [SPARK-23905][SQL] Add UDF weekday

2018-04-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21009
  
**[Test build #89235 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89235/testReport)**
 for PR 21009 at commit 
[`363ff1c`](https://github.com/apache/spark/commit/363ff1cd83c27f4ad85c3231910a9da51922aaaf).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21009: [SPARK-23905][SQL] Add UDF weekday

2018-04-11 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21009
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21049: [SPARK-23957][SQL] Remove redundant sort operator...

2018-04-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21049#discussion_r180963692
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -307,6 +309,32 @@ object RemoveRedundantProject extends 
Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove [[Sort]] in subqueries that do not affect the set of rows 
produced, only their
+ * order. Subqueries produce unordered sets of rows so sorting their 
output is unnecessary.
+ */
+object RemoveSubquerySorts extends Rule[LogicalPlan] {
+
+  /**
+   * Removes all [[Sort]] operators from a plan that are accessible from 
the root operator via
+   * 0 or more [[Project]], [[Filter]] or [[View]] operators.
+   */
+  private def removeTopLevelSorts(plan: LogicalPlan): LogicalPlan = {
+plan match {
+  case Sort(_, _, child) => removeTopLevelSorts(child)
+  case Project(fields, child) => Project(fields, 
removeTopLevelSorts(child))
+  case Filter(condition, child) => Filter(condition, 
removeTopLevelSorts(child))
+  case View(tbl, output, child) => View(tbl, output, 
removeTopLevelSorts(child))
+  case _ => plan
+}
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case Subquery(child) => Subquery(removeTopLevelSorts(child))
+case SubqueryAlias(name, child) => SubqueryAlias(name, 
removeTopLevelSorts(child))
--- End diff --

`SubqueryAlias` is not the subquery you want. This is just an alias of a 
query/table/view.  For example, 
```Scala
Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", 
"str_sort").orderBy('int.asc).as('df1)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21048
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21048
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89232/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...

2018-04-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21048
  
**[Test build #89232 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89232/testReport)**
 for PR 21048 at commit 
[`1b23492`](https://github.com/apache/spark/commit/1b23492017b8209b6198e6886e06a5cd9d59b9db).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21049: [SPARK-23957][SQL] Remove redundant sort operators from ...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21049
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21049: [SPARK-23957][SQL] Remove redundant sort operators from ...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21049
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2238/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21049: [SPARK-23957][SQL] Remove redundant sort operators from ...

2018-04-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21049
  
**[Test build #89234 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89234/testReport)**
 for PR 21049 at commit 
[`bb992c2`](https://github.com/apache/spark/commit/bb992c2058863322a9183b2985806a87729e4168).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21009: [SPARK-23905][SQL] Add UDF weekday

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21009
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21009: [SPARK-23905][SQL] Add UDF weekday

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21009
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89229/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21009: [SPARK-23905][SQL] Add UDF weekday

2018-04-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21009
  
**[Test build #89229 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89229/testReport)**
 for PR 21009 at commit 
[`363ff1c`](https://github.com/apache/spark/commit/363ff1cd83c27f4ad85c3231910a9da51922aaaf).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21049: [SPARK-23957][SQL] Remove redundant sort operator...

2018-04-11 Thread henryr
GitHub user henryr opened a pull request:

https://github.com/apache/spark/pull/21049

[SPARK-23957][SQL] Remove redundant sort operators from subqueries

## What changes were proposed in this pull request?

Subqueries (at least in SQL) have 'bag of tuples' semantics. Ordering
them is therefore redundant (unless combined with a limit). This patch
adds a new optimizer rule that removes sort operators that are directly
below subqueries (or some combination of projection and filtering below
a subquery).

## How was this patch tested?

New unit tests. All sql unit tests pass.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/henryr/spark spark-23957

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21049.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21049


commit bb992c2058863322a9183b2985806a87729e4168
Author: Henry Robinson 
Date:   2018-04-12T03:44:36Z

[SPARK-23957][SQL] Remove redundant sort operators from subqueries

## What changes were proposed in this pull request?

Subqueries (at least in SQL) have 'bag of tuples' semantics. Ordering
them is therefore redundant (unless combined with a limit). This patch
adds a new optimizer rule that removes sort operators that are directly
below subqueries (or some combination of projection and filtering below
a subquery).

## How was this patch tested?

New unit tests. All sql unit tests pass.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIndex

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21004
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIndex

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21004
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2237/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIndex

2018-04-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21004
  
**[Test build #89233 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89233/testReport)**
 for PR 21004 at commit 
[`43f6b77`](https://github.com/apache/spark/commit/43f6b776088fd3cfecc9339323068d6dad6170c4).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19868: [SPARK-22676] Avoid iterating all partition paths...

2018-04-11 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19868#discussion_r180962121
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
@@ -176,12 +176,13 @@ class HadoopTableReader(
   val matches = fs.globStatus(pathPattern)
   matches.foreach(fileStatus => existPathSet += 
fileStatus.getPath.toString)
 }
-// convert  /demo/data/year/month/day  to  /demo/data/*/*/*/
+// convert  /demo/data/year/month/day  to  
/demo/data/year/month/*/
--- End diff --

If the partition columns are `A/B/C/D`, unless you specify a value for `A`, 
you have to check that level. The same case for `B`/`C`/`D`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19868: [SPARK-22676] Avoid iterating all partition paths...

2018-04-11 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19868#discussion_r180961987
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
@@ -176,12 +176,13 @@ class HadoopTableReader(
   val matches = fs.globStatus(pathPattern)
   matches.foreach(fileStatus => existPathSet += 
fileStatus.getPath.toString)
 }
-// convert  /demo/data/year/month/day  to  /demo/data/*/*/*/
+// convert  /demo/data/year/month/day  to  
/demo/data/year/month/*/
--- End diff --

Em... It seems we have to check all the levels unless we have specified a 
value for each partition column. We can make some improvement here but seems 
that require more complicated approach.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21046: [SPARK-23955][PYSPARK]Typo in classification.py

2018-04-11 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/21046
  
Yea that's fine. Let's close this one then.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21046: [SPARK-23955][PYSPARK]Typo in classification.py

2018-04-11 Thread codeforfun15
Github user codeforfun15 commented on the issue:

https://github.com/apache/spark/pull/21046
  
Yes , My bad as neither the JIRA was updated nor the fix was merged ,Didn't 
know it. Thanks @HyukjinKwon  !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to handle...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20345
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to handle...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20345
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89225/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to handle...

2018-04-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20345
  
**[Test build #89225 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89225/testReport)**
 for PR 20345 at commit 
[`94d9171`](https://github.com/apache/spark/commit/94d9171b8ec26c21724dd393cf4fc83ff52623e7).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIndex

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21004
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIndex

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21004
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89224/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIndex

2018-04-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21004
  
**[Test build #89224 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89224/testReport)**
 for PR 21004 at commit 
[`d12efab`](https://github.com/apache/spark/commit/d12efabd34a27de53effe4d1f1d7e81cb691fc32).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21048
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89227/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21048
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...

2018-04-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21048
  
**[Test build #89227 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89227/testReport)**
 for PR 21048 at commit 
[`f9965f1`](https://github.com/apache/spark/commit/f9965f1cecf58b9aa79e0a746e6e16580238ef7b).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20871: [SPARK-23762][SQL] UTF8StringBuffer uses MemoryBlock

2018-04-11 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/20871
  
ping @cloud-fan


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21048
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89226/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21048
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...

2018-04-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21048
  
**[Test build #89226 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89226/testReport)**
 for PR 21048 at commit 
[`f1fc175`](https://github.com/apache/spark/commit/f1fc175a3e599c8b72a2c07dcd97bdc3d43e2092).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  sealed trait RenameHelperMethods `


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21046: [SPARK-23955][PYSPARK]Typo in classification.py

2018-04-11 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/21046
  
Isn't it fixed in 
https://github.com/apache/spark/commit/9d960de0814a1128318676cc2e91f447cdf0137f?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21036: [SPARK-23958][CORE] HadoopRdd filters empty files to avo...

2018-04-11 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/21036
  
Yes, this is already supported in Spark, seems like the PR is invalid.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21038: [SPARK-22968][DStream] Throw an exception on partition r...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21038
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21038: [SPARK-22968][DStream] Throw an exception on partition r...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21038
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89230/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21038: [SPARK-22968][DStream] Throw an exception on partition r...

2018-04-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21038
  
**[Test build #89230 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89230/testReport)**
 for PR 21038 at commit 
[`a7770e9`](https://github.com/apache/spark/commit/a7770e9b853e5357f737922dcd019058823b6812).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21048
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2236/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21048
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...

2018-04-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21048
  
**[Test build #89232 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89232/testReport)**
 for PR 21048 at commit 
[`1b23492`](https://github.com/apache/spark/commit/1b23492017b8209b6198e6886e06a5cd9d59b9db).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20874: [SPARK-23763][SQL] OffHeapColumnVector uses MemoryBlock

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20874
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20874: [SPARK-23763][SQL] OffHeapColumnVector uses MemoryBlock

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20874
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2235/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20874: [SPARK-23763][SQL] OffHeapColumnVector uses MemoryBlock

2018-04-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20874
  
**[Test build #89231 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89231/testReport)**
 for PR 20874 at commit 
[`9ef19df`](https://github.com/apache/spark/commit/9ef19dfcde9dc84f494bff5f03a56db840741496).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21037: [SPARK-23919][SQL] Add array_position function

2018-04-11 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21037
  
@bersprockets You are absolutely right. I made mistake. I will completely 
reimplement this soon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20874: [SPARK-23763][SQL] OffHeapColumnVector uses MemoryBlock

2018-04-11 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/20874
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21038: [SPARK-22968][DStream] Throw an exception on partition r...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21038
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2234/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21038: [SPARK-22968][DStream] Throw an exception on partition r...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21038
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21038: [SPARK-22968][DStream] Throw an exception on partition r...

2018-04-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21038
  
**[Test build #89230 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89230/testReport)**
 for PR 21038 at commit 
[`a7770e9`](https://github.com/apache/spark/commit/a7770e9b853e5357f737922dcd019058823b6812).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21009: [SPARK-23905][SQL] Add UDF weekday

2018-04-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21009
  
**[Test build #89229 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89229/testReport)**
 for PR 21009 at commit 
[`363ff1c`](https://github.com/apache/spark/commit/363ff1cd83c27f4ad85c3231910a9da51922aaaf).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21038: [SPARK-22968][DStream] Fix Kafka partition revoked issue

2018-04-11 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/21038
  
Thanks @koeninger , then I will just improve the exception message.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20959: [SPARK-23846][SQL] The samplingRatio option for C...

2018-04-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20959#discussion_r180943744
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 ---
@@ -1279,4 +1278,68 @@ class CSVSuite extends QueryTest with 
SharedSQLContext with SQLTestUtils {
   Row("0,2013-111-11 12:13:14") :: Row(null) :: Nil
 )
   }
+
+  test("SPARK-23846: schema inferring touches less data if samplingRatio < 
1.0") {
+// Set default values for the DataSource parameters to make sure
+// that whole test file is mapped to only one partition. This will 
guarantee
+// reliable sampling of the input file.
+withSQLConf(
+  "spark.sql.files.maxPartitionBytes" -> (128 * 1024 * 1024).toString,
+  "spark.sql.files.openCostInBytes" -> (4 * 1024 * 1024).toString
+)(withTempPath { path =>
+  val rdd = spark.sqlContext.range(0, 100).map {row =>
--- End diff --

Can we actually do

```
val predefinedSample = Set[Long](2, 8, 15, 27, 30, 34, 35, 37, 44, 
46,
  57, 62, 68, 72)
val value = row.getLong(0)
if (predefinedSample.contains(value)) {
  value.toString
} else {
  (value.toDouble + 0.1).toString
}
```

outside of `map`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20959: [SPARK-23846][SQL] The samplingRatio option for C...

2018-04-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20959#discussion_r180942904
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 ---
@@ -1279,4 +1278,68 @@ class CSVSuite extends QueryTest with 
SharedSQLContext with SQLTestUtils {
   Row("0,2013-111-11 12:13:14") :: Row(null) :: Nil
 )
   }
+
+  test("SPARK-23846: schema inferring touches less data if samplingRatio < 
1.0") {
+// Set default values for the DataSource parameters to make sure
+// that whole test file is mapped to only one partition. This will 
guarantee
+// reliable sampling of the input file.
+withSQLConf(
+  "spark.sql.files.maxPartitionBytes" -> (128 * 1024 * 1024).toString,
+  "spark.sql.files.openCostInBytes" -> (4 * 1024 * 1024).toString
+)(withTempPath { path =>
+  val rdd = spark.sqlContext.range(0, 100).map {row =>
+val predefinedSample = Set[Long](2, 8, 15, 27, 30, 34, 35, 37, 44, 
46,
+  57, 62, 68, 72)
+val value = row.getLong(0)
+if (predefinedSample.contains(value)) {
+  value.toString
+} else {
+  (value.toDouble + 0.1).toString
+}
+  }.repartition(1)
+  rdd.write.text(path.getAbsolutePath)
+
+  val ds = spark.read
+.option("inferSchema", true)
+.option("samplingRatio", 0.1)
+.csv(path.getCanonicalPath)
+  assert(ds.schema == new StructType().add("_c0", IntegerType))
+})
+  }
+
+  test("SPARK-23846: usage of samplingRatio while parsing a dataset of 
strings") {
+val dstr = spark.sparkContext.parallelize(0 until 100, 1).map { i =>
+  val predefinedSample = Set[Int](2, 8, 15, 27, 30, 34, 35, 37, 44, 46,
+57, 62, 68, 72)
+  if (predefinedSample.contains(i)) {
+i.toString + "\n"
+  } else {
+(i.toDouble + 0.1) + "\n"
--- End diff --

Hm .. does it need `\n`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20959: [SPARK-23846][SQL] The samplingRatio option for C...

2018-04-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20959#discussion_r180942841
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 ---
@@ -1279,4 +1278,68 @@ class CSVSuite extends QueryTest with 
SharedSQLContext with SQLTestUtils {
   Row("0,2013-111-11 12:13:14") :: Row(null) :: Nil
 )
   }
+
+  test("SPARK-23846: schema inferring touches less data if samplingRatio < 
1.0") {
+// Set default values for the DataSource parameters to make sure
+// that whole test file is mapped to only one partition. This will 
guarantee
+// reliable sampling of the input file.
+withSQLConf(
+  "spark.sql.files.maxPartitionBytes" -> (128 * 1024 * 1024).toString,
+  "spark.sql.files.openCostInBytes" -> (4 * 1024 * 1024).toString
+)(withTempPath { path =>
+  val rdd = spark.sqlContext.range(0, 100).map {row =>
+val predefinedSample = Set[Long](2, 8, 15, 27, 30, 34, 35, 37, 44, 
46,
+  57, 62, 68, 72)
+val value = row.getLong(0)
+if (predefinedSample.contains(value)) {
+  value.toString
+} else {
+  (value.toDouble + 0.1).toString
+}
+  }.repartition(1)
+  rdd.write.text(path.getAbsolutePath)
+
+  val ds = spark.read
+.option("inferSchema", true)
+.option("samplingRatio", 0.1)
+.csv(path.getCanonicalPath)
+  assert(ds.schema == new StructType().add("_c0", IntegerType))
+})
+  }
+
+  test("SPARK-23846: usage of samplingRatio while parsing a dataset of 
strings") {
+val dstr = spark.sparkContext.parallelize(0 until 100, 1).map { i =>
+  val predefinedSample = Set[Int](2, 8, 15, 27, 30, 34, 35, 37, 44, 46,
--- End diff --

Can we take this out of `map`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21048
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20959: [SPARK-23846][SQL] The samplingRatio option for C...

2018-04-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20959#discussion_r180942647
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3009,6 +3009,15 @@ def test_sort_with_nulls_order(self):
 
df.select(df.name).orderBy(functions.desc_nulls_last('name')).collect(),
 [Row(name=u'Tom'), Row(name=u'Alice'), Row(name=None)])
 
+def test_csv_sampling_ratio(self):
+rdd = self.spark.sparkContext.range(0, 100).\
+map(lambda x: '0.1' if x == 1 else str(x)).\
--- End diff --

`.\nmap` -> `\n.map`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21048
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89223/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...

2018-04-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21048
  
**[Test build #89223 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89223/testReport)**
 for PR 21048 at commit 
[`df7b339`](https://github.com/apache/spark/commit/df7b339d73097b8501fe0937f770b8b2ded1b63e).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  .doc(\"The class used to write checkpoint files atomically. This 
class must be a subclass \" +`
  * `trait CheckpointFileManager `
  * `  abstract class CancellableFSDataOutputStream(protected val 
underlyingStream: OutputStream)`
  * `  sealed class RenameBasedFSDataOutputStream(`
  * `class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: 
Configuration)`
  * `class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: 
Configuration)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21038: [SPARK-22968][DStream] Fix Kafka partition revoked issue

2018-04-11 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/21038
  
I can't think of a valid reason to create a configuration to allow it.  It
just fundamentally doesn't make sense to run different apps with the same
group id.

Trying to catch and rethrow the exception with more information might make
sense.

On Wed, Apr 11, 2018, 20:05 Saisai Shao  wrote:

> Thanks @koeninger  for your comments. I
> think your suggestion is valid, the log here is just pasted from JIRA, but
> we also got the same issue from customer's report.
>
> Here in the PR description, I mentioned that using two apps with same
> group id to mimic this issue. But I'm not sure the real use case from our
> customer, maybe in their scenario such usage is valid.
>
> So I'm wondering if we can add a configuration to control whether it
> should be fail or just warning. Also I think exception/warning log should
> be improved to directly tell user about consumer rebalance issue, rather
> than throwing from Kafka as "no current assignment for partition xxx".
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> , or 
mute
> the thread
> 

> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180940991
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileSystem => _, _}
--- End diff --

whoa .. i dont know either... my intellij did some weird magic :/


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180940932
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileSystem => _, _}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename
+   */
+  sealed trait 

[GitHub] spark pull request #20888: [SPARK-23775][TEST] Make DataFrameRangeSuite not ...

2018-04-11 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20888#discussion_r180940952
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala ---
@@ -152,22 +154,28 @@ class DataFrameRangeSuite extends QueryTest with 
SharedSQLContext with Eventuall
   }
 
   test("Cancelling stage in a query with Range.") {
+val slices = 10
+
 val listener = new SparkListener {
-  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
-eventually(timeout(10.seconds), interval(1.millis)) {
-  assert(DataFrameRangeSuite.stageToKill > 0)
+  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+eventually(timeout(10.seconds)) {
+  assert(DataFrameRangeSuite.isTaskStarted)
 }
-sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
+sparkContext.cancelStage(taskStart.stageId)
+DataFrameRangeSuite.semaphore.release(slices)
--- End diff --

I know this is what the other test does, but after looking at this again I 
think this is still dangerous.

`sparkContext.cancelStage` is asynchronous, so you might wake up all the 
tasks and they might finish before the cancellation actually happens. If you 
just remove this line, then you guarantee the tasks won't finish unless the 
state is canceled.

Which means you don't really need the semaphore, you can just sleep 
indefinitely in the tasks (e.g. call `wait()`). And instead of the `eventually` 
above you could use a `CountDownLatch`.

And if you think of it, you don't need the `DataFrameSuite` object since 
everything here is local to this test.

All this also applies to the other test, so if you feel like cleaning up 
that one in a separate PR...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180940389
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
 ---
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io._
+import java.net.URI
+
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.util.Utils
+
+abstract class CheckpointFileManagerTests extends SparkFunSuite {
+
+  def createManager(path: Path): CheckpointFileManager
+
+  test("mkdirs, list, createAtomic, open, delete") {
+withTempPath { p =>
+  val basePath = new Path(p.getAbsolutePath)
+  val fm = createManager(basePath)
+  // Mkdirs
+  val dir = new Path(s"$basePath/dir/subdir/subsubdir")
+  assert(!fm.exists(dir))
+  fm.mkdirs(dir)
+  assert(fm.exists(dir))
+  fm.mkdirs(dir)
+
+  // List
+  val acceptAllFilter = new PathFilter {
+override def accept(path: Path): Boolean = true
+  }
+  val rejectAllFilter = new PathFilter {
+override def accept(path: Path): Boolean = false
+  }
+  assert(fm.list(basePath, acceptAllFilter).exists(_.getPath.getName 
== "dir"))
+  assert(fm.list(basePath, rejectAllFilter).length === 0)
+
+  // Create atomic without overwrite
+  var path = new Path(s"$dir/file")
+  assert(!fm.exists(path))
+  fm.createAtomic(path, overwriteIfPossible = false).cancel()
+  assert(!fm.exists(path))
+  fm.createAtomic(path, overwriteIfPossible = false).close()
+  assert(fm.exists(path))
+  intercept[IOException] {
+// should throw exception since file exists and overwrite is false
+fm.createAtomic(path, overwriteIfPossible = false).close()
+  }
+
+  // Create atomic with overwrite if possible
+  path = new Path(s"$dir/file2")
+  assert(!fm.exists(path))
+  fm.createAtomic(path, overwriteIfPossible = true).cancel()
+  assert(!fm.exists(path))
+  fm.createAtomic(path, overwriteIfPossible = true).close()
+  assert(fm.exists(path))
+  fm.createAtomic(path, overwriteIfPossible = true).close()  // should 
not throw exception
+
+  // Open and delete
+  fm.open(path).close()
+  fm.delete(path)
+  assert(!fm.exists(path))
+  intercept[IOException] {
+fm.open(path)
+  }
+  fm.delete(path) // should not throw exception
+}
+  }
+
+  protected def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+}
+
+class CheckpointFileManagerSuite extends SparkFunSuite with 
SharedSparkSession {
+
+  test("CheckpointFileManager.create() should pick up user-specified class 
from conf") {
+withSQLConf(
+  SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key ->
+classOf[TestCheckpointFileManager].getName) {
+  val fileManager =
+CheckpointFileManager.create(new Path("/"), 
spark.sessionState.newHadoopConf)
+  assert(fileManager.isInstanceOf[TestCheckpointFileManager])
+}
+  }
+
+  test("CheckpointFileManager.create() should fallback from FileContext to 
FileSystem") {
+import FakeFileSystem.scheme
+spark.conf.set(
+  s"fs.$scheme.impl",
+  classOf[FakeFileSystem].getName)
+withTempDir { temp =>
+  val metadataLog = new HDFSMetadataLog[String](spark, 
s"$scheme://${temp.toURI.getPath}")
+  

[GitHub] spark pull request #20959: [SPARK-23846][SQL] The samplingRatio option for C...

2018-04-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20959#discussion_r180940274
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
@@ -528,6 +529,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
* `header` (default `false`): uses the first line as names of 
columns.
* `inferSchema` (default `false`): infers the input schema 
automatically from data. It
* requires one extra pass over the data.
+   * `samplingRatio` (default 1.0): the sample ratio of rows used for 
schema inferring.
--- End diff --

Yes, please. Seems this option can be useful given your investigation and 
@rxin's opinion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21038: [SPARK-22968][DStream] Fix Kafka partition revoked issue

2018-04-11 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/21038
  
Thanks @koeninger for your comments. I think your suggestion is valid, the 
log here is just pasted from JIRA, but we also got the same issue from 
customer's report. 

Here in the PR description, I mentioned that using two apps with same group 
id to mimic this issue. But I'm not sure the real use case from our customer, 
maybe in their scenario such usage is valid.

So I'm wondering if we can add a configuration to control whether it should 
be fail or just warning. Also I think exception/warning log should be improved 
to directly tell user about consumer rebalance issue, rather than throwing from 
Kafka as "no current assignment for partition xxx".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180936744
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileSystem => _, _}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename
+   */
+  sealed trait 

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180938118
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileSystem => _, _}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename
+   */
+  sealed trait 

[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...

2018-04-11 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20828#discussion_r180939125
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -53,32 +53,24 @@ class ContinuousSuiteBase extends StreamTest {
   // A continuous trigger that will only fire the initial time for the 
duration of a test.
   // This allows clean testing with manual epoch advancement.
   protected val longContinuousTrigger = Trigger.Continuous("1 hour")
+
+  override protected implicit val defaultTrigger = Trigger.Continuous(100)
+  override protected val defaultUseV2Sink = true
 }
 
 class ContinuousSuite extends ContinuousSuiteBase {
   import testImplicits._
 
-  test("basic rate source") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
+  test("basic") {
+val input = MemoryStream[Int]
--- End diff --

This has been done as of the current commit. Some uses of MemoryStreamBase 
are still needed in order to make the AddData test handle work.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180935752
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileSystem => _, _}
--- End diff --

whoa what does `FileSystem => _` do?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180938649
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
 ---
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io._
+import java.net.URI
+
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.util.Utils
+
+abstract class CheckpointFileManagerTests extends SparkFunSuite {
+
+  def createManager(path: Path): CheckpointFileManager
+
+  test("mkdirs, list, createAtomic, open, delete") {
+withTempPath { p =>
+  val basePath = new Path(p.getAbsolutePath)
+  val fm = createManager(basePath)
+  // Mkdirs
+  val dir = new Path(s"$basePath/dir/subdir/subsubdir")
+  assert(!fm.exists(dir))
+  fm.mkdirs(dir)
+  assert(fm.exists(dir))
+  fm.mkdirs(dir)
+
+  // List
+  val acceptAllFilter = new PathFilter {
+override def accept(path: Path): Boolean = true
+  }
+  val rejectAllFilter = new PathFilter {
+override def accept(path: Path): Boolean = false
+  }
+  assert(fm.list(basePath, acceptAllFilter).exists(_.getPath.getName 
== "dir"))
+  assert(fm.list(basePath, rejectAllFilter).length === 0)
+
+  // Create atomic without overwrite
+  var path = new Path(s"$dir/file")
+  assert(!fm.exists(path))
+  fm.createAtomic(path, overwriteIfPossible = false).cancel()
+  assert(!fm.exists(path))
+  fm.createAtomic(path, overwriteIfPossible = false).close()
+  assert(fm.exists(path))
+  intercept[IOException] {
+// should throw exception since file exists and overwrite is false
+fm.createAtomic(path, overwriteIfPossible = false).close()
+  }
+
+  // Create atomic with overwrite if possible
+  path = new Path(s"$dir/file2")
+  assert(!fm.exists(path))
+  fm.createAtomic(path, overwriteIfPossible = true).cancel()
+  assert(!fm.exists(path))
+  fm.createAtomic(path, overwriteIfPossible = true).close()
+  assert(fm.exists(path))
+  fm.createAtomic(path, overwriteIfPossible = true).close()  // should 
not throw exception
+
+  // Open and delete
+  fm.open(path).close()
+  fm.delete(path)
+  assert(!fm.exists(path))
+  intercept[IOException] {
+fm.open(path)
+  }
+  fm.delete(path) // should not throw exception
+}
+  }
+
+  protected def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+}
+
+class CheckpointFileManagerSuite extends SparkFunSuite with 
SharedSparkSession {
+
+  test("CheckpointFileManager.create() should pick up user-specified class 
from conf") {
+withSQLConf(
+  SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key ->
+classOf[TestCheckpointFileManager].getName) {
+  val fileManager =
+CheckpointFileManager.create(new Path("/"), 
spark.sessionState.newHadoopConf)
+  assert(fileManager.isInstanceOf[TestCheckpointFileManager])
+}
+  }
+
+  test("CheckpointFileManager.create() should fallback from FileContext to 
FileSystem") {
+import FakeFileSystem.scheme
+spark.conf.set(
+  s"fs.$scheme.impl",
+  classOf[FakeFileSystem].getName)
+withTempDir { temp =>
+  val metadataLog = new HDFSMetadataLog[String](spark, 
s"$scheme://${temp.toURI.getPath}")
+  

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180938989
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 ---
@@ -471,6 +470,41 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
 }
   }
 
+  test("error writing [version].delta cancels the output stream") {
+
+val hadoopConf = new Configuration()
+hadoopConf.set(
+  SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key,
+  classOf[TestCheckpointFileManager].getName)
+val remoteDir = Utils.createTempDir().getAbsolutePath
+
+val provider = newStoreProvider(
+  opId = Random.nextInt, partition = 0, dir = remoteDir, hadoopConf = 
hadoopConf)
+
+// Disable failure of output stream and generate versions
+TestCheckpointFileManager.shouldFailInCreateAtomic = false
+for (version <- 1 to 10) {
+  val store = provider.getStore(version - 1)
+  put(store, version.toString, version) // update "1" -> 1, "2" -> 2, 
...
+  store.commit()
+}
+val version10Data = (1L to 10).map(_.toString).map(x => x -> x).toSet
+
+val store = provider.getStore(10)
+// Fail commit for next version and verify that reloading resets the 
files
+TestCheckpointFileManager.shouldFailInCreateAtomic = true
+put(store, "11", 11)
+val e = intercept[IllegalStateException] { quietly { store.commit() } }
+assert(e.getCause.isInstanceOf[IOException], "Was waiting the 
IOException to be thrown")
+TestCheckpointFileManager.shouldFailInCreateAtomic = false
+
+// Abort commit for next version and verify that reloading resets the 
files
+val store2 = provider.getStore(10)
+put(store2, "11", 11)
+store2.abort()
+assert(TestCheckpointFileManager.cancelCalledInCreateAtomic)
--- End diff --

can you verify that it was false before the `abort`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180936292
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileSystem => _, _}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename
+   */
+  sealed trait 

[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/21048#discussion_r180937241
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileSystem => _, _}
+import java.util.{EnumSet, UUID}
+
+import scala.util.control.NonFatal
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * An interface to abstract out all operation related to streaming 
checkpoints. Most importantly,
+ * the key operation this interface provides is `createAtomic(path, 
overwrite)` which returns a
+ * `CancellableFSDataOutputStream`. This method is used by 
[[HDFSMetadataLog]] and
+ * [[org.apache.spark.sql.execution.streaming.state.StateStore 
StateStore]] implementations
+ * to write a complete checkpoint file atomically (i.e. no partial file 
will be visible), with or
+ * without overwrite.
+ *
+ * This higher-level interface above the Hadoop FileSystem is necessary 
because
+ * different implementation of FileSystem/FileContext may have different 
combination of operations
+ * to provide the desired atomic guarantees (e.g. 
write-to-temp-file-and-rename,
+ * direct-write-and-cancel-on-failure) and this abstraction allow 
different implementations while
+ * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
+ */
+trait CheckpointFileManager {
+
+  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
+
+  /**
+   * Create a file and make its contents available atomically after the 
output stream is closed.
+   *
+   * @param pathPath to create
+   * @param overwriteIfPossible If true, then the implementations must do 
a best-effort attempt to
+   *overwrite the file if it already exists. 
It should not throw
+   *any exception if the file exists. However, 
if false, then the
+   *implementation must not overwrite if the 
file alraedy exists and
+   *must throw `FileAlreadyExistsException` in 
that case.
+   */
+  def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream
+
+  /** Open a file for reading, or throw exception if it does not exist. */
+  def open(path: Path): FSDataInputStream
+
+  /** List the files in a path that match a filter. */
+  def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+  /** List all the files in a path. */
+  def list(path: Path): Array[FileStatus] = {
+list(path, new PathFilter { override def accept(path: Path): Boolean = 
true })
+  }
+
+  /** Make directory at the give path and all its parent directories as 
needed. */
+  def mkdirs(path: Path): Unit
+
+  /** Whether path exists */
+  def exists(path: Path): Boolean
+
+  /** Recursively delete a path if it exists. Should not throw exception 
if file doesn't exist. */
+  def delete(path: Path): Unit
+
+  /** Is the default file system this implementation is operating on the 
local file system. */
+  def isLocal: Boolean
+}
+
+object CheckpointFileManager extends Logging {
+
+  /**
+   * Additional methods in CheckpointFileManager implementations that 
allows
+   * [[RenameBasedFSDataOutputStream]] get atomicity by 
write-to-temp-file-and-rename
+   */
+  sealed trait 

[GitHub] spark issue #20998: [SPARK-23888][CORE] speculative task should not run on a...

2018-04-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20998
  
**[Test build #89228 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89228/testReport)**
 for PR 20998 at commit 
[`5901728`](https://github.com/apache/spark/commit/5901728d6be8ad33e39d56006a2bc8cc02cfff38).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21023: [SPARK-23949] makes && supports the function of predicat...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21023
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89221/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21023: [SPARK-23949] makes && supports the function of predicat...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21023
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21023: [SPARK-23949] makes && supports the function of predicat...

2018-04-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21023
  
**[Test build #89221 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89221/testReport)**
 for PR 21023 at commit 
[`d76d1b8`](https://github.com/apache/spark/commit/d76d1b88cd2cefe1cdb9f4ce6519429ce7df3ba0).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20998: [SPARK-23888][CORE] speculative task should not r...

2018-04-11 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20998#discussion_r180937626
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -880,8 +880,8 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3)
   }
 
-  test("speculative task should not run on a given host where another 
attempt " +
-"is already running on") {
+  test("SPARK-23888: speculative task should not run on a given host " +
+"where another attempt is already running on") {
--- End diff --

Sure. Also, do we need to reword PR and jira title? @squito 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21037: [SPARK-23919][SQL] Add array_position function

2018-04-11 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21037
  
Is array_position a string function? The function array_contains, for 
example, works on arrays.

Also, this new function *seems* very similar as the existing instr ("Locate 
the position of the first occurrence of substr column in the given string"), 
except the return type is BigDecimal rather than Integer.

For example, I ran a query using your branch:


scala> val df = Seq((Seq(1, 2, 3), "this is a test"), (Seq(7, 8, 9, 3), 
"yeah this")).toDF("a", "b")
df: org.apache.spark.sql.DataFrame = [a: array, b: string]

scala> df.select(array_contains('a, 2), array_position('b, "this"), 
instr('b, "this")).show
++---+--+
|array_contains(a, 2)|array_position(b, this)|instr(b, this)|
++---+--+
|true|  1| 1|
|   false|  6| 6|
++---+--+


scala> 
scala> df.select(array_position('a, 3)).show
:35: error: type mismatch;
 found   : Int(3)
 required: String
   df.select(array_position('a, 3)).show
^

scala> df.select(array_contains('a, 3)).show
++
|array_contains(a, 3)|
++
|true|
|true|
++


scala> 




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21039: [SPARK-23960][SQL][MINOR] Mark HashAggregateExec.bufVars...

2018-04-11 Thread rednaxelafx
Github user rednaxelafx commented on the issue:

https://github.com/apache/spark/pull/21039
  
Thanks for reverting it for me. The test failure was definitely related to 
the explicit nulling from this PR, but I can't see how that's possible yet.

First of all, in the build that first introduced my change, build 4693, 
this particular test was passing:

https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-2.7/4693/testReport/junit/org.apache.spark.sql/TPCDSQuerySuite/q61/

The build that failed was the one immediately after that.

Second, the stack trace seen from the failure indicates that 
`doProduceWithoutKeys` is indeed on the stack,  and it was before the line that 
I null'd out `bufVars`, so I can't see how `bufVars` can be null in 
`doConsumeWithoutKeys`.

Stack trace:
```
java.lang.NullPointerException
  at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doConsumeWithoutKeys(HashAggregateExec.scala:274)
  at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doConsume(HashAggregateExec.scala:171)
  at 
org.apache.spark.sql.execution.CodegenSupport$class.constructDoConsumeFunction(WholeStageCodegenExec.scala:209)
  at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:180)
  at 
org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:35)
  at 
org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:65)
  at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:182)
...
  at 
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
  at 
org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
  at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithoutKeys(HashAggregateExec.scala:237)
  at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:163)
```

The relevant line in `doConsumeWithoutKeys` is:

https://github.com/apache/spark/blob/75a183071c4ed2e407c930edfdf721779662b3ee/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L274
It's reading `bufVars`.

The relevant line in `doProduceWithoutKeys` is:

https://github.com/apache/spark/blob/75a183071c4ed2e407c930edfdf721779662b3ee/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L237
It's calling `child.produce()`, and that's before the nulling at line 241.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21048
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2233/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...

2018-04-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21048
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...

2018-04-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21048
  
**[Test build #89227 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89227/testReport)**
 for PR 21048 at commit 
[`f9965f1`](https://github.com/apache/spark/commit/f9965f1cecf58b9aa79e0a746e6e16580238ef7b).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   >