[GitHub] spark pull request #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-251...

2018-09-11 Thread henryr
Github user henryr closed the pull request at:

https://github.com/apache/spark/pull/22211


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL...

2018-09-11 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/22211
  
Merged to 2.1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-251...

2018-08-23 Thread henryr
GitHub user henryr opened a pull request:

https://github.com/apache/spark/pull/22211

[SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL][BACKPORT-2.…

## What changes were proposed in this pull request?

Back port of #20393 and #22079.

Currently shuffle repartition uses RoundRobinPartitioning, the 
generated result is nondeterministic since the sequence of input rows are not 
determined.

The bug can be triggered when there is a repartition call following a 
shuffle (which would lead to non-deterministic row ordering), as the pattern 
shows below:
upstream stage -> repartition stage -> result stage
(-> indicate a shuffle)
When one of the executors process goes down, some tasks on the 
repartition stage will be retried and generate inconsistent ordering, and some 
tasks of the result stage will be retried generating different data.

The following code returns 931532, instead of 100:
```
import scala.sys.process._

import org.apache.spark.TaskContext
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId 
< 2) {
throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()
```

In this PR, we propose a most straight-forward way to fix this problem 
by performing a local sort before partitioning, after we make the input row 
ordering deterministic, the function from rows to partitions is fully 
deterministic too.

The downside of the approach is that with extra local sort inserted, 
the performance of repartition() will go down, so we add a new config named 
`spark.sql.execution.sortBeforeRepartition` to control whether this patch is 
applied. The patch is default enabled to be safe-by-default, but user may 
choose to manually turn it off to avoid performance regression.

This patch also changes the output rows ordering of repartition(), that 
leads to a bunch of test cases failure because they are comparing the results 
directly.

Add unit test in ExchangeSuite.

With this patch(and `spark.sql.execution.sortBeforeRepartition` set to 
true), the following query returns 100:
```
import scala.sys.process._

import org.apache.spark.TaskContext

spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true")

val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId 
< 2) {
throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()

res7: Long = 100
```

Author: Xingbo Jiang 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/henryr/spark spark-23207-branch-2.1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22211.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22211


commit 5bdd9e353287c2a326138b25f80ee255d15942b0
Author: Xingbo Jiang 
Date:   2018-08-23T21:22:56Z

[SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL][BACKPORT-2.1] 
Shuffle+Repartition on a DataFrame could lead to incorrect answers

## What changes were proposed in this pull request?

Back port of #20393 and #22079.

Currently shuffle repartition uses RoundRobinPartitioning, the generated 
result is nondeterministic since the sequence of input rows are not determined.

The bug can be triggered when there is a repartition call following a 
shuffle (which would lead to non-deterministic row ordering), as the pattern 
shows below:
upstream stage -> repartition stage -> result stage
(-> indicate a shuffle)
When one of the executors process goes down, some tasks on the repartition 
stage will be retried and generate inconsistent ordering, and some tasks of the 
result stage will be retried generating different data.

The following code returns 931532, instead of 100:
```
import scala.sys.process._

import org.apache.spark.TaskContext
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 
2) {
throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()
```

In thi

[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-21 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/21482
  
Any further comments here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-21 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21482#discussion_r197234189
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -468,6 +468,18 @@ def input_file_name():
 return Column(sc._jvm.functions.input_file_name())
 
 
+@since(2.4)
+def isinf(col):
--- End diff --

@HyukjinKwon could you clarify, please?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-07 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/21482
  
I think consistency in Spark's naming convention (and therefore increased
discoverability by users) outweighs the advantage of naming it exactly for
the Impala equivalent. I do agree that multiple aliases probably aren't
worth the trouble at this point.

FWIW, I would have used this function if it had been available recently. So
it's not just hypothetical. And to me this provides some symmetry for
support for 'special' float values, since we already have isnan().

On 7 June 2018 at 15:36, Reynold Xin  wrote:

> Thanks, Henry. In general I'm not a huge fan of adding something because
> hypothetically somebody might want it. Also if you want this to be
> compatible with Impala, wouldn't you want to name this the same way as
> Impala?
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/21482#issuecomment-395587233>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAFc1dxwI0XBXVvtW7qXKOu3BmbuXEV0ks5t6asDgaJpZM4UXVuP>
> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-07 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21482#discussion_r193898812
  
--- Diff: R/pkg/NAMESPACE ---
@@ -281,6 +281,8 @@ exportMethods("%<=>%",
   "initcap",
   "input_file_name",
   "instr",
+  "isInf",
+  "isinf",
--- End diff --

Do you know if there's any consistency behind the different capitalization 
schemes? There's `format_number`, `isnan` and `isNotNull` here, for example. 

If not, how about we just go with `isInf` for now and if other aliases are 
needed in the future they can be added and discussed then?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-07 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/21482
  
@rxin, that in itself is a bit weird, but there are ways to express inf 
values in Scala and thus inf values can show up flowing through Spark plans. 
I'm not sure MySQL has any such facility.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-06 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/21482
  
@rxin Other engines are all over the place:

* MySQL doesn't have support for infinity (based on my cursory look) - 1.0 
/ 0.0 is written as `null`. Also seems to be true of SQLite.
* Postgres has type-specific literals (e.g. `FLOAT8 '+Infinity'`) which you 
can use for comparison checks. 
* Impala has `is_inf()`
* Oracle has a literal value, and also a built-in predicate `SELECT f FROM 
foo WHERE f is infinite`
* SQL Server does not appear to support infinity (but that's based on 
anecdotal evidence)

One of my motivations suggesting this builtin was to make sharing workloads 
with Impala easier. I think it's convenient to have as well, since it's more 
intuitive than figuring out what the right literal incantation should be.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-01 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21482#discussion_r192522027
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala
 ---
@@ -199,6 +199,50 @@ case class Nvl2(expr1: Expression, expr2: Expression, 
expr3: Expression, child:
   override def sql: String = s"$prettyName(${expr1.sql}, ${expr2.sql}, 
${expr3.sql})"
 }
 
+/**
+ * Evaluates to `true` iff it's Infinity.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns True evaluates to infinite else returns 
False ",
--- End diff --

"True evaluates" -> "True if expr evaluates"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-01 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21482#discussion_r192520713
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NullExpressionsSuite.scala
 ---
@@ -56,6 +56,16 @@ class NullExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
 assert(ex.contains("Null value appeared in non-nullable field"))
   }
 
+  test("IsInf") {
+checkEvaluation(IsInf(Literal(Double.PositiveInfinity)), true)
+checkEvaluation(IsInf(Literal(Double.NegativeInfinity)), true)
+checkEvaluation(IsInf(Literal(Float.PositiveInfinity)), true)
+checkEvaluation(IsInf(Literal(Float.NegativeInfinity)), true)
+checkEvaluation(IsInf(Literal.create(null, DoubleType)), false)
+checkEvaluation(IsInf(Literal(Float.MaxValue)), false)
+checkEvaluation(IsInf(Literal(5.5f)), false)
--- End diff --

check NaN as well?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-01 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21482#discussion_r192521881
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala
 ---
@@ -199,6 +199,50 @@ case class Nvl2(expr1: Expression, expr2: Expression, 
expr3: Expression, child:
   override def sql: String = s"$prettyName(${expr1.sql}, ${expr2.sql}, 
${expr3.sql})"
 }
 
+/**
+ * Evaluates to `true` iff it's Infinity.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns True evaluates to infinite else returns 
False ",
+  examples = """
+Examples:
+  > SELECT _FUNC_(1/0);
+   True
+  > SELECT _FUNC_(5);
+   False
+  """)
+case class IsInf(child: Expression) extends UnaryExpression
+  with Predicate with ImplicitCastInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = 
Seq(TypeCollection(DoubleType, FloatType))
+
+  override def nullable: Boolean = false
+
+  override def eval(input: InternalRow): Boolean = {
+val value = child.eval(input)
+if (value == null) {
+  false
+} else {
+  child.dataType match {
+case DoubleType => value.asInstanceOf[Double].isInfinity
+case FloatType => value.asInstanceOf[Float].isInfinity
+  }
+}
+  }
+
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val eval = child.genCode(ctx)
+child.dataType match {
+  case DoubleType | FloatType =>
+ev.copy(code = code"""
+  ${eval.code}
+  ${CodeGenerator.javaType(dataType)} ${ev.value} = 
${CodeGenerator.defaultValue(dataType)};
+  ${ev.value} = !${eval.isNull} && 
Double.isInfinite(${eval.value});""",
--- End diff --

out of interest, why use `Double.isInfinite` here, but `value.isInfinity` 
in the non-codegen version?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-01 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21482#discussion_r192520834
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---
@@ -1107,6 +1107,14 @@ object functions {
*/
   def input_file_name(): Column = withExpr { InputFileName() }
 
+  /**
+   * Return true iff the column is Infinity.
+   *
+   * @group normal_funcs
+   * @since 1.6.0
--- End diff --

Need to fix these versions, here and elsewhere. This change would land in 
Spark 2.4.0.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-01 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21482#discussion_r192520566
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NullExpressionsSuite.scala
 ---
@@ -24,7 +24,7 @@ import 
org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
 import org.apache.spark.sql.types._
 
-class NullExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper 
{
+  class NullExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
--- End diff --

Revert this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21302: [SPARK-23852][SQL] Upgrade to Parquet 1.8.3

2018-05-14 Thread henryr
Github user henryr closed the pull request at:

https://github.com/apache/spark/pull/21302


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21302: [SPARK-23852][SQL] Upgrade to Parquet 1.8.3

2018-05-14 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/21302
  
Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21302: [SPARK-23852][SQL] Upgrade to Parquet 1.8.3

2018-05-14 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21302#discussion_r188042296
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ---
@@ -602,6 +602,16 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
   }
 }
   }
+
+  test("SPARK-23852: Broken Parquet push-down for partially-written 
stats") {
+// parquet-1217.parquet contains a single column with values -1, 0, 1, 
2 and null.
+// The row-group statistics include null counts, but not min and max 
values, which
+// triggers PARQUET-1217.
+val df = readResourceParquetFile("test-data/parquet-1217.parquet")
--- End diff --

PR for master is https://github.com/apache/spark/pull/21323. My guess is 
there's no reason to block this backport and 2.3.1 by waiting for it to land, 
but happy to do whatever.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21323: [SPARK-23582][SQL] Add withSQLConf(...) to test c...

2018-05-14 Thread henryr
GitHub user henryr opened a pull request:

https://github.com/apache/spark/pull/21323

[SPARK-23582][SQL] Add withSQLConf(...) to test case

## What changes were proposed in this pull request?

Add a `withSQLConf(...)` wrapper to force Parquet filter pushdown for a 
test that relies on it. 

## How was this patch tested?

Test passes

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/henryr/spark spark-23582

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21323.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21323


commit fdcacd8868de0aca3d13ae5ca5a9e323f114fab9
Author: Henry Robinson <henry@...>
Date:   2018-05-14T17:48:22Z

[SPARK-23582][SQL] Add withSQLConf(...) to test case




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21302: [SPARK-23852][SQL] Upgrade to Parquet 1.8.3

2018-05-11 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/21302
  
Sounds good, done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21302: [SPARK-23852][SQL] Upgrade to Parquet 1.8.3

2018-05-11 Thread henryr
GitHub user henryr opened a pull request:

https://github.com/apache/spark/pull/21302

[SPARK-23852][SQL] Upgrade to Parquet 1.8.3

## What changes were proposed in this pull request?

Upgrade Parquet dependency to 1.8.3 to avoid PARQUET-1217

## How was this patch tested?

Ran testcase from SPARK-23852 (will backport in a separate PR after this 
goes in).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/henryr/spark branch-2.3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21302.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21302


commit 35e214995201d6b3a9a013d0f8d2106b084f4de9
Author: Henry Robinson <henry@...>
Date:   2018-05-11T18:50:26Z

[SPARK-23852][SQL] Upgrade to Parquet 1.8.3




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21049: [SPARK-23957][SQL] Remove redundant sort operators from ...

2018-05-09 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/21049
  
@dilipbiswal I tried hard to find something in the SQL standard that 
clarified the situation, but couldn't (of course that could be because the 
standard is pretty hard to parse... :)). 

So let's go with the idea of not dropping `ORDER BY` in inline views, but 
we can safely drop them in scalar subqueries and nested subqueries. What do you 
think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21284: [SPARK-23852][SQL] Add test that fails if PARQUET...

2018-05-09 Thread henryr
GitHub user henryr opened a pull request:

https://github.com/apache/spark/pull/21284

[SPARK-23852][SQL] Add test that fails if PARQUET-1217 is not fixed

## What changes were proposed in this pull request?

Add a new test that triggers if PARQUET-1217 - a predicate pushdown bug - 
is not fixed in Spark's Parquet dependency. 

## How was this patch tested?

New unit test passes. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/henryr/spark spark-23852

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21284.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21284


commit 4b42ee2c54a2f5488f6cc4e65ce5a401c16a6d8b
Author: Henry Robinson <henry@...>
Date:   2018-04-12T18:53:13Z

[SPARK-23852][SQL] Add test that fails if PARQUET-1217 is not fixed




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21201: [SPARK-24128][SQL] Mention configuration option in impli...

2018-05-07 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/21201
  
Any chance to get this merged now the tests are working again? Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21049: [SPARK-23957][SQL] Remove redundant sort operators from ...

2018-05-04 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/21049
  
I might be a bit of a hardliner on this, but I think it's correct to 
eliminate the {{ORDER BY}} from common table expressions (e.g. MSSQL agrees 
with me, see [this 
link](https://docs.microsoft.com/en-us/sql/t-sql/queries/with-common-table-expression-transact-sql?view=sql-server-2017#guidelines-for-creating-and-using-common-table-expressions)).

However, given the principle of least surprise, I agree it might be a good 
idea to at least start with scalar and nested subqueries, and leave inline 
views for another day. That might be a bit harder to do (I think the rule will 
need a whitelist of operators it's ok to eliminate sorts below), and in general 
I think there'll be some missed opportunities, but it's a start :)

Alternatively we could extend the analyzed logical plan to explicitly mark 
the different subquery types (i.e. have a `InlineView` node, a `NestedSubquery` 
node and so on). That would make these optimizations easier to express, but I 
have some reservations about the semantics of introducing those nodes. What do 
you think @dilipbiswal / @gatorsmile ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21145: [SPARK-24073][SQL]: Rename DataReaderFactory to R...

2018-05-04 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21145#discussion_r186143060
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java ---
@@ -22,20 +22,20 @@
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
- * A reader factory returned by {@link 
DataSourceReader#createDataReaderFactories()} and is
+ * A read task returned by {@link DataSourceReader#createReadTasks()} and 
is
--- End diff --

Ok - how about `ReadTaskDescriptor`? (In that case I think it would be ok 
to leave the method name as `createReadTasks()`). 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21145: [SPARK-24073][SQL]: Rename DataReaderFactory to ReadTask...

2018-05-02 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/21145
  
I don't mind `ReadTask`. It's imperfect because 'task' implies that this is 
a thing that can be executed, whereas this interface doesn't have a way to pass 
control to the task object. It's more like a description of a read task, but I 
think `ReadTaskDescriptor` would be a bit verbose. 

I certainly agree that this is not a factory. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21201: [SPARK-24128][SQL] Mention configuration option in impli...

2018-05-02 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/21201
  
No problem! It's a small usability fix. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21201: [SPARK-24128][SQL] Mention configuration option in impli...

2018-05-01 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/21201
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21201: [SPARK-24128][SQL] Mention configuration option i...

2018-04-30 Thread henryr
GitHub user henryr opened a pull request:

https://github.com/apache/spark/pull/21201

[SPARK-24128][SQL] Mention configuration option in implicit CROSS JOIN error

## What changes were proposed in this pull request?

Mention `spark.sql.crossJoin.enabled` in error message when an implicit 
`CROSS JOIN` is detected.

## How was this patch tested?

`CartesianProductSuite` and `JoinSuite`. 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/henryr/spark spark-24128

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21201.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21201






---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21049: [SPARK-23957][SQL] Remove redundant sort operators from ...

2018-04-27 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/21049
  
@dilipbiswal thanks for the clarification. 

I agree that this particular case - where the alias is the root of a 
logical plan - might need special handling. Is there any reason to actually use 
an alias at the root of a plan like this (outside of composing with other 
plans, where this optimization would apply)? My suggestion would be, since 
there are no references to the name the alias introduces, to consider just 
dropping the alias node during optimization (and then the sort would not get 
dropped). 

It does seem to be an edge case though - no matter how we handle 
unreferred-to aliases, the optimization seems to be appropriate for the general 
case where aliases do correspond to subqueries. What do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21122: [SPARK-24017] [SQL] Refactor ExternalCatalog to be an in...

2018-04-27 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/21122
  
cc @rdblue 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.

2018-04-25 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/21070
  
yes, thanks @maropu! +1 to the idea of making this a jenkins job. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat

2018-04-24 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/21073
  
@gatorsmile this looks ready for your review (asking because you filed the 
JIRA) if you time, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.

2018-04-24 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/21070
  
Ok, thanks for the context. I've worked on other projects where it's 
sometimes ok to take a small risk of a perf hit on trunk as long as the 
community is committed to addressing the issues before the next release. If the 
norms here are not to absorb any risk on trunk, that also seems reasonable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.

2018-04-24 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/21070
  
@cloud-fan since there’s probably quite some time before this lands in a 
release, what do you think about merging this now if it’s ready, and filing 
the perf jira as a blocker against 2.4? My guess is that Spark will want to 
move to 1.10 at some point no matter what, so doing it this way has the 
advantage of giving plenty of time for any other issues to shake out before the 
next release. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat

2018-04-23 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21073#discussion_r183560436
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2186,6 +2186,29 @@ def map_values(col):
 return Column(sc._jvm.functions.map_values(_to_java_column(col)))
 
 
+@since(2.4)
+def map_concat(*cols):
+"""Returns the union of all the given maps. If a key is found in 
multiple given maps,
+that key's value in the resulting map comes from the last one of those 
maps.
+
+:param cols: list of column names (string) or list of :class:`Column` 
expressions
+
+>>> from pyspark.sql.functions import map_concat
+>>> df = spark.sql("SELECT map(1, 'a', 2, 'b') as map1, map(3, 'c', 1, 
'd') as map2")
+>>> df.select(map_concat("map1", 
"map2").alias("map3")).show(truncate=False)
+++
+|map3|
+++
+|[1 -> d, 2 -> b, 3 -> c]|
+++
+"""
+sc = SparkContext._active_spark_context
+if len(cols) == 1 and isinstance(cols[0], (list, set)):
+cols = cols[0]
--- End diff --

what's this for?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat

2018-04-23 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21073#discussion_r183558371
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -116,6 +118,154 @@ case class MapValues(child: Expression)
   override def prettyName: String = "map_values"
 }
 
+/**
+ * Returns the union of all the given maps.
+ */
+@ExpressionDescription(
+usage = "_FUNC_(map, ...) - Returns the union of all the given maps",
+examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd'));
+   [[1 -> "a"], [2 -> "c"], [3 -> "d"]
+  """)
+case class MapConcat(children: Seq[Expression]) extends Expression
+  with CodegenFallback {
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+// this check currently does not allow valueContainsNull to vary,
+// and unfortunately none of the MapType toString methods include
+// valueContainsNull for the error message
+if (children.size < 2) {
+  TypeCheckResult.TypeCheckFailure(
+s"$prettyName expects at least two input maps.")
+} else if (children.exists(!_.dataType.isInstanceOf[MapType])) {
+  TypeCheckResult.TypeCheckFailure(
+s"The given input of function $prettyName should all be of type 
map, " +
+  "but they are " + 
children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else if (children.map(_.dataType).distinct.length > 1) {
+  TypeCheckResult.TypeCheckFailure(
+s"The given input maps of function $prettyName should all be the 
same type, " +
+  "but they are " + 
children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else {
+  TypeCheckResult.TypeCheckSuccess
+}
+  }
+
+  override def dataType: MapType = {
+children.headOption.map(_.dataType.asInstanceOf[MapType])
+  .getOrElse(MapType(keyType = StringType, valueType = StringType))
+  }
+
+  override def nullable: Boolean = true
+
+  override def eval(input: InternalRow): Any = {
+val union = new util.LinkedHashMap[Any, Any]()
+children.map(_.eval(input)).foreach { raw =>
+  if (raw == null) {
+return null
+  }
+  val map = raw.asInstanceOf[MapData]
+  map.foreach(dataType.keyType, dataType.valueType, (k, v) =>
+union.put(k, v)
+  )
+}
+val (keyArray, valueArray) = union.entrySet().toArray().map { e =>
+  val e2 = e.asInstanceOf[java.util.Map.Entry[Any, Any]]
+  (e2.getKey, e2.getValue)
+}.unzip
+new ArrayBasedMapData(new GenericArrayData(keyArray), new 
GenericArrayData(valueArray))
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val mapCodes = children.map(c => c.genCode(ctx))
+val keyType = children.head.dataType.asInstanceOf[MapType].keyType
+val valueType = children.head.dataType.asInstanceOf[MapType].valueType
+val mapRefArrayName = ctx.freshName("mapRefArray")
+val unionMapName = ctx.freshName("union")
+
+val mapDataClass = classOf[MapData].getName
+val arrayBasedMapDataClass = classOf[ArrayBasedMapData].getName
+val arrayDataClass = classOf[ArrayData].getName
+val genericArrayDataClass = classOf[GenericArrayData].getName
+val hashMapClass = classOf[util.LinkedHashMap[Any, Any]].getName
+val entryClass = classOf[util.Map.Entry[Any, Any]].getName
+
+val init =
+  s"""
+|$mapDataClass[] $mapRefArrayName = new 
$mapDataClass[${mapCodes.size}];
+|boolean ${ev.isNull} = false;
+|$mapDataClass ${ev.value} = null;
+  """.stripMargin
+
+val assignments = mapCodes.zipWithIndex.map { case (m, i) =>
+  val initCode = mapCodes(i).code
+  val valueVarName = mapCodes(i).value.code
+  s"""
+ |$initCode
+ |$mapRefArrayName[$i] = $valueVarName;
+ |if ($valueVarName == null) {
+ |  ${ev.isNull} = true;
+ |}
+   """.stripMargin
+}.mkString("\n")
+
+val index1Name = ctx.freshName("idx1")
+val index2Name = ctx.freshName("idx2")
+val mapDataName = ctx.freshName("m")
+val kaName = ctx.freshName("ka")
+val vaName = ctx.freshName("va")
+val keyName = ctx.freshName("key")
+val valueName = ctx.freshName("value")
+val isNullCheckName = ctx.freshName("isNull")
--- End diff --

unused?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat

2018-04-23 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21073#discussion_r183559826
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala ---
@@ -376,6 +376,35 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSQLContext {
 )
   }
 
+  test("map_concat function") {
+val df1 = Seq(
+  (Map[Int, Int](1 -> 100, 2 -> 200), Map[Int, Int](3 -> 300, 4 -> 
400)),
+  (Map[Int, Int](1 -> 100, 2 -> 200), Map[Int, Int](3 -> 300, 1 -> 
400)),
+  (null, Map[Int, Int](3 -> 300, 4 -> 400))
+).toDF("map1", "map2")
+checkAnswer(
+  df1.selectExpr("map_concat(map1, map2)"),
+  Seq(
+Row(Map(1 -> 100, 2 -> 200, 3 -> 300, 4 -> 400)),
+Row(Map(1 -> 400, 2 -> 200, 3 -> 300)),
+Row(null)
+  )
+)
+
+val df2 = Seq(
+  (Map[Int, Int](1 -> 100, 2 -> 200), Map[String, Int]("3" -> 300, "4" 
-> 400))
+).toDF("map1", "map2")
+assert(intercept[AnalysisException] {
+  df2.selectExpr("map_concat(map1, map2)").collect()
+}.getMessage().contains("input maps of function map_concat should all 
be the same type"))
+assert(intercept[AnalysisException] {
--- End diff --

can you put a blank line between tests? makes it a bit easier to see the 
separation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat

2018-04-23 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21073#discussion_r183559190
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -116,6 +118,154 @@ case class MapValues(child: Expression)
   override def prettyName: String = "map_values"
 }
 
+/**
+ * Returns the union of all the given maps.
+ */
+@ExpressionDescription(
+usage = "_FUNC_(map, ...) - Returns the union of all the given maps",
+examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd'));
+   [[1 -> "a"], [2 -> "c"], [3 -> "d"]
+  """)
+case class MapConcat(children: Seq[Expression]) extends Expression
+  with CodegenFallback {
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+// this check currently does not allow valueContainsNull to vary,
+// and unfortunately none of the MapType toString methods include
+// valueContainsNull for the error message
+if (children.size < 2) {
+  TypeCheckResult.TypeCheckFailure(
+s"$prettyName expects at least two input maps.")
+} else if (children.exists(!_.dataType.isInstanceOf[MapType])) {
+  TypeCheckResult.TypeCheckFailure(
+s"The given input of function $prettyName should all be of type 
map, " +
+  "but they are " + 
children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else if (children.map(_.dataType).distinct.length > 1) {
+  TypeCheckResult.TypeCheckFailure(
+s"The given input maps of function $prettyName should all be the 
same type, " +
+  "but they are " + 
children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else {
+  TypeCheckResult.TypeCheckSuccess
+}
+  }
+
+  override def dataType: MapType = {
+children.headOption.map(_.dataType.asInstanceOf[MapType])
+  .getOrElse(MapType(keyType = StringType, valueType = StringType))
+  }
+
+  override def nullable: Boolean = true
+
+  override def eval(input: InternalRow): Any = {
+val union = new util.LinkedHashMap[Any, Any]()
+children.map(_.eval(input)).foreach { raw =>
+  if (raw == null) {
+return null
+  }
+  val map = raw.asInstanceOf[MapData]
+  map.foreach(dataType.keyType, dataType.valueType, (k, v) =>
+union.put(k, v)
+  )
+}
+val (keyArray, valueArray) = union.entrySet().toArray().map { e =>
+  val e2 = e.asInstanceOf[java.util.Map.Entry[Any, Any]]
+  (e2.getKey, e2.getValue)
+}.unzip
+new ArrayBasedMapData(new GenericArrayData(keyArray), new 
GenericArrayData(valueArray))
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val mapCodes = children.map(c => c.genCode(ctx))
+val keyType = children.head.dataType.asInstanceOf[MapType].keyType
+val valueType = children.head.dataType.asInstanceOf[MapType].valueType
+val mapRefArrayName = ctx.freshName("mapRefArray")
+val unionMapName = ctx.freshName("union")
+
+val mapDataClass = classOf[MapData].getName
+val arrayBasedMapDataClass = classOf[ArrayBasedMapData].getName
+val arrayDataClass = classOf[ArrayData].getName
+val genericArrayDataClass = classOf[GenericArrayData].getName
+val hashMapClass = classOf[util.LinkedHashMap[Any, Any]].getName
+val entryClass = classOf[util.Map.Entry[Any, Any]].getName
+
+val init =
+  s"""
+|$mapDataClass[] $mapRefArrayName = new 
$mapDataClass[${mapCodes.size}];
+|boolean ${ev.isNull} = false;
+|$mapDataClass ${ev.value} = null;
+  """.stripMargin
+
+val assignments = mapCodes.zipWithIndex.map { case (m, i) =>
+  val initCode = mapCodes(i).code
+  val valueVarName = mapCodes(i).value.code
+  s"""
+ |$initCode
+ |$mapRefArrayName[$i] = $valueVarName;
+ |if ($valueVarName == null) {
+ |  ${ev.isNull} = true;
+ |}
+   """.stripMargin
+}.mkString("\n")
+
+val index1Name = ctx.freshName("idx1")
+val index2Name = ctx.freshName("idx2")
+val mapDataName = ctx.freshName("m")
+val kaName = ctx.freshName("ka")
+val vaName = ctx.freshName("va")
+val keyNam

[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat

2018-04-23 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21073#discussion_r183560201
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -116,6 +118,154 @@ case class MapValues(child: Expression)
   override def prettyName: String = "map_values"
 }
 
+/**
+ * Returns the union of all the given maps.
+ */
+@ExpressionDescription(
+usage = "_FUNC_(map, ...) - Returns the union of all the given maps",
+examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd'));
+   [[1 -> "a"], [2 -> "c"], [3 -> "d"]
+  """)
+case class MapConcat(children: Seq[Expression]) extends Expression
+  with CodegenFallback {
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+// this check currently does not allow valueContainsNull to vary,
+// and unfortunately none of the MapType toString methods include
+// valueContainsNull for the error message
+if (children.size < 2) {
+  TypeCheckResult.TypeCheckFailure(
+s"$prettyName expects at least two input maps.")
+} else if (children.exists(!_.dataType.isInstanceOf[MapType])) {
+  TypeCheckResult.TypeCheckFailure(
+s"The given input of function $prettyName should all be of type 
map, " +
+  "but they are " + 
children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else if (children.map(_.dataType).distinct.length > 1) {
+  TypeCheckResult.TypeCheckFailure(
+s"The given input maps of function $prettyName should all be the 
same type, " +
+  "but they are " + 
children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else {
+  TypeCheckResult.TypeCheckSuccess
+}
+  }
+
+  override def dataType: MapType = {
+children.headOption.map(_.dataType.asInstanceOf[MapType])
+  .getOrElse(MapType(keyType = StringType, valueType = StringType))
+  }
+
+  override def nullable: Boolean = true
+
+  override def eval(input: InternalRow): Any = {
+val union = new util.LinkedHashMap[Any, Any]()
+children.map(_.eval(input)).foreach { raw =>
+  if (raw == null) {
+return null
+  }
+  val map = raw.asInstanceOf[MapData]
+  map.foreach(dataType.keyType, dataType.valueType, (k, v) =>
+union.put(k, v)
+  )
+}
+val (keyArray, valueArray) = union.entrySet().toArray().map { e =>
+  val e2 = e.asInstanceOf[java.util.Map.Entry[Any, Any]]
+  (e2.getKey, e2.getValue)
+}.unzip
+new ArrayBasedMapData(new GenericArrayData(keyArray), new 
GenericArrayData(valueArray))
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val mapCodes = children.map(c => c.genCode(ctx))
--- End diff --

FWIW, I don't really feel strongly either way here. The codegen method 
isn't so large as to be hard to understand yet. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat

2018-04-23 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21073#discussion_r183559429
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -116,6 +118,154 @@ case class MapValues(child: Expression)
   override def prettyName: String = "map_values"
 }
 
+/**
+ * Returns the union of all the given maps.
+ */
+@ExpressionDescription(
+usage = "_FUNC_(map, ...) - Returns the union of all the given maps",
+examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd'));
+   [[1 -> "a"], [2 -> "c"], [3 -> "d"]
+  """)
+case class MapConcat(children: Seq[Expression]) extends Expression
+  with CodegenFallback {
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+// this check currently does not allow valueContainsNull to vary,
+// and unfortunately none of the MapType toString methods include
+// valueContainsNull for the error message
+if (children.size < 2) {
+  TypeCheckResult.TypeCheckFailure(
+s"$prettyName expects at least two input maps.")
+} else if (children.exists(!_.dataType.isInstanceOf[MapType])) {
+  TypeCheckResult.TypeCheckFailure(
+s"The given input of function $prettyName should all be of type 
map, " +
+  "but they are " + 
children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else if (children.map(_.dataType).distinct.length > 1) {
+  TypeCheckResult.TypeCheckFailure(
+s"The given input maps of function $prettyName should all be the 
same type, " +
+  "but they are " + 
children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else {
+  TypeCheckResult.TypeCheckSuccess
+}
+  }
+
+  override def dataType: MapType = {
+children.headOption.map(_.dataType.asInstanceOf[MapType])
+  .getOrElse(MapType(keyType = StringType, valueType = StringType))
+  }
+
+  override def nullable: Boolean = true
+
+  override def eval(input: InternalRow): Any = {
+val union = new util.LinkedHashMap[Any, Any]()
+children.map(_.eval(input)).foreach { raw =>
+  if (raw == null) {
+return null
+  }
+  val map = raw.asInstanceOf[MapData]
+  map.foreach(dataType.keyType, dataType.valueType, (k, v) =>
+union.put(k, v)
+  )
+}
+val (keyArray, valueArray) = union.entrySet().toArray().map { e =>
+  val e2 = e.asInstanceOf[java.util.Map.Entry[Any, Any]]
+  (e2.getKey, e2.getValue)
+}.unzip
+new ArrayBasedMapData(new GenericArrayData(keyArray), new 
GenericArrayData(valueArray))
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val mapCodes = children.map(c => c.genCode(ctx))
+val keyType = children.head.dataType.asInstanceOf[MapType].keyType
+val valueType = children.head.dataType.asInstanceOf[MapType].valueType
+val mapRefArrayName = ctx.freshName("mapRefArray")
+val unionMapName = ctx.freshName("union")
+
+val mapDataClass = classOf[MapData].getName
+val arrayBasedMapDataClass = classOf[ArrayBasedMapData].getName
+val arrayDataClass = classOf[ArrayData].getName
+val genericArrayDataClass = classOf[GenericArrayData].getName
+val hashMapClass = classOf[util.LinkedHashMap[Any, Any]].getName
+val entryClass = classOf[util.Map.Entry[Any, Any]].getName
+
+val init =
+  s"""
+|$mapDataClass[] $mapRefArrayName = new 
$mapDataClass[${mapCodes.size}];
+|boolean ${ev.isNull} = false;
+|$mapDataClass ${ev.value} = null;
+  """.stripMargin
+
+val assignments = mapCodes.zipWithIndex.map { case (m, i) =>
+  val initCode = mapCodes(i).code
+  val valueVarName = mapCodes(i).value.code
+  s"""
+ |$initCode
+ |$mapRefArrayName[$i] = $valueVarName;
+ |if ($valueVarName == null) {
+ |  ${ev.isNull} = true;
+ |}
+   """.stripMargin
+}.mkString("\n")
+
+val index1Name = ctx.freshName("idx1")
+val index2Name = ctx.freshName("idx2")
+val mapDataName = ctx.freshName("m")
+val kaName = ctx.freshName("ka")
+val vaName = ctx.freshName("va")
+val keyNam

[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat

2018-04-23 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21073#discussion_r183559663
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
 ---
@@ -56,6 +58,26 @@ class CollectionExpressionsSuite extends SparkFunSuite 
with ExpressionEvalHelper
 checkEvaluation(MapValues(m2), null)
   }
 
+  test("Map Concat") {
+val m0 = Literal.create(Map("a" -> "1", "b" -> "2"), 
MapType(StringType, StringType))
+val m1 = Literal.create(Map("c" -> "3", "a" -> "4"), 
MapType(StringType, StringType))
+val m2 = Literal.create(Map("d" -> "4", "e" -> "5"), 
MapType(StringType, StringType))
+val mNull = Literal.create(null, MapType(StringType, StringType))
+
+// overlapping maps
+checkEvaluation(MapConcat(Seq(m0, m1)),
+  mutable.LinkedHashMap("a" -> "4", "b" -> "2", "c" -> "3"))
+// maps with no overlap
+checkEvaluation(MapConcat(Seq(m0, m2)),
+  mutable.LinkedHashMap("a" -> "1", "b" -> "2", "d" -> "4", "e" -> 
"5"))
+// 3 maps
+checkEvaluation(MapConcat(Seq(m0, m1, m2)),
+  mutable.LinkedHashMap("a" -> "4", "b" -> "2", "c" -> "3", "d" -> 
"4", "e" -> "5"))
+// null map
+checkEvaluation(MapConcat(Seq(m0, mNull)),
--- End diff --

good idea to check `Seq(mNull, m0)` as well in case there's any asymmetry 
in the way the first argument is handled.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21049: [SPARK-23957][SQL] Remove redundant sort operators from ...

2018-04-23 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/21049
  
@dilipbiswal Thanks! Although Spark doesn't necessarily parse the query in 
the `from` clause as a subquery, is it fair to say it plans it as one? (Since 
the planner puts the alias under a `SubqueryAlias` node). The optimization of 
removing the sorts seems valid to me in any case, since neither an alias nor a 
subquery should be sorting except in the presence of a `limit`. Or do you think 
that this optimization should only be applied to subqueries that aren't in the 
`from` clause?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.

2018-04-23 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/21070
  
This looks pretty good to me - are there any committers that can give it a 
(hopefully) final review?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat

2018-04-18 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21073#discussion_r182547477
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -115,6 +116,62 @@ case class MapValues(child: Expression)
   override def prettyName: String = "map_values"
 }
 
+/**
+ * Returns the union of all the given maps.
+ */
+@ExpressionDescription(
+usage = "_FUNC_(map, ...) - Returns the union of all the given maps",
+examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd'));
+   [[1 -> "a"], [2 -> "c"], [3 -> "d"]
+  """)
+case class MapConcat(children: Seq[Expression]) extends Expression
+  with CodegenFallback {
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+// this check currently does not allow valueContainsNull to vary,
+// and unfortunately none of the MapType toString methods include
+// valueContainsNull for the error message
+if (children.exists(!_.dataType.isInstanceOf[MapType])) {
+  TypeCheckResult.TypeCheckFailure(
+s"The given input of function $prettyName should all be of type 
map, " +
+  "but they are " + 
children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else if (children.map(_.dataType).distinct.length > 1) {
+  TypeCheckResult.TypeCheckFailure(
+s"The given input maps of function $prettyName should all be the 
same type, " +
+  "but they are " + 
children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else {
+  TypeCheckResult.TypeCheckSuccess
+}
+  }
+  override def dataType: MapType = {
+children.headOption.map(_.dataType.asInstanceOf[MapType])
+  .getOrElse(MapType(keyType = StringType, valueType = StringType))
+  }
+
+  override def nullable: Boolean = false
--- End diff --

Hm, seems a bit unusual to me to have, in effect, `NULL ++ NULL => Map()`. 
I checked with Presto and it looks like it returns `NULL`:

presto> select map_concat(NULL, NULL)
 -> ;
 _col0
---
 NULL
(1 row)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.

2018-04-17 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/21070
  
@scottcarey I agree that's important. Perhaps it could be done as a 
follow-up PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21073: [SPARK-23936][SQL][WIP] Implement map_concat

2018-04-16 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21073#discussion_r181915827
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -115,6 +116,62 @@ case class MapValues(child: Expression)
   override def prettyName: String = "map_values"
 }
 
+/**
+ * Returns the union of all the given maps.
+ */
+@ExpressionDescription(
+usage = "_FUNC_(map, ...) - Returns the union of all the given maps",
+examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd'));
+   [[1 -> "a"], [2 -> "c"], [3 -> "d"]
+  """)
+case class MapConcat(children: Seq[Expression]) extends Expression
+  with CodegenFallback {
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+// this check currently does not allow valueContainsNull to vary,
+// and unfortunately none of the MapType toString methods include
+// valueContainsNull for the error message
+if (children.exists(!_.dataType.isInstanceOf[MapType])) {
+  TypeCheckResult.TypeCheckFailure(
+s"The given input of function $prettyName should all be of type 
map, " +
+  "but they are " + 
children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else if (children.map(_.dataType).distinct.length > 1) {
+  TypeCheckResult.TypeCheckFailure(
+s"The given input maps of function $prettyName should all be the 
same type, " +
+  "but they are " + 
children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else {
+  TypeCheckResult.TypeCheckSuccess
+}
+  }
+  override def dataType: MapType = {
+children.headOption.map(_.dataType.asInstanceOf[MapType])
+  .getOrElse(MapType(keyType = StringType, valueType = StringType))
+  }
+
+  override def nullable: Boolean = false
--- End diff --

What's the result of `map_concat(NULL, NULL)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21068: [SPARK-16630][YARN] Blacklist a node if executors...

2018-04-16 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21068#discussion_r181908529
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.util.{Clock, SystemClock, Utils}
+
+private[spark] class YarnAllocatorBlacklistTracker(
+sparkConf: SparkConf,
+amClient: AMRMClient[ContainerRequest],
+failureWithinTimeIntervalTracker: FailureWithinTimeIntervalTracker)
+  extends Logging {
+
+  private val DEFAULT_TIMEOUT = "1h"
+
+  private val BLACKLIST_TIMEOUT_MILLIS =
+
sparkConf.get(BLACKLIST_TIMEOUT_CONF).getOrElse(Utils.timeStringAsMs(DEFAULT_TIMEOUT))
+
+  private val IS_YARN_ALLOCATION_BLACKLIST_ENABLED =
+sparkConf.get(YARN_ALLOCATION_BLACKLIST_ENABLED).getOrElse(false)
+
+  private val BLACKLIST_MAX_FAILED_EXEC_PER_NODE = 
sparkConf.get(MAX_FAILED_EXEC_PER_NODE)
+
+  private val BLACKLIST_SIZE_LIMIT = 
sparkConf.get(YARN_BLACKLIST_SIZE_LIMIT)
+
+  private val BLACKLIST_SIZE_DEFAULT_WEIGHT = 
sparkConf.get(YARN_BLACKLIST_SIZE_DEFAULT_WEIGHT)
+
+  private var clock: Clock = new SystemClock
+
+  private val allocationBlacklistedNodesWithExpiry = new HashMap[String, 
Long]()
+
+  private var currentBlacklistedYarnNodes = Set.empty[String]
+
+  private var schedulerBlacklistedNodesWithExpiry = Map.empty[String, Long]
--- End diff --

Do you need to keep a separate data structure for the scheduler and 
allocator blacklisted nodes? Instead, could you add the scheduler ones into a 
shared map when `setSchedulerBlacklistedNodes` is called?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21068: [SPARK-16630][YARN] Blacklist a node if executors...

2018-04-16 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21068#discussion_r181911744
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/FailureWithinTimeIntervalTracker.scala
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.{Clock, SystemClock}
+
+private[spark] class FailureWithinTimeIntervalTracker(sparkConf: 
SparkConf) extends Logging {
+
+  private var clock: Clock = new SystemClock
+
+  private val executorFailuresValidityInterval =
+
sparkConf.get(config.EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L)
+
+  // Queue to store the timestamp of failed executors for each host
+  private val failedExecutorsTimeStampsPerHost = mutable.Map[String, 
mutable.Queue[Long]]()
+
+  private val failedExecutorsTimeStamps = new mutable.Queue[Long]()
+
+  private def getNumFailuresWithinValidityInterval(
+  failedExecutorsTimeStampsForHost: mutable.Queue[Long],
+  endTime: Long): Int = {
+while (executorFailuresValidityInterval > 0
+  && failedExecutorsTimeStampsForHost.nonEmpty
+  && failedExecutorsTimeStampsForHost.head < endTime - 
executorFailuresValidityInterval) {
--- End diff --

This relies on the fact the `clock` is monotonic, but if it's a 
`SystemClock` it's based on `System.currentTimeMillis()` which is not monotonic 
and can time-travel. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21068: [SPARK-16630][YARN] Blacklist a node if executors...

2018-04-16 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21068#discussion_r181910914
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/FailureWithinTimeIntervalTracker.scala
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.{Clock, SystemClock}
+
+private[spark] class FailureWithinTimeIntervalTracker(sparkConf: 
SparkConf) extends Logging {
+
+  private var clock: Clock = new SystemClock
+
+  private val executorFailuresValidityInterval =
+
sparkConf.get(config.EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L)
+
+  // Queue to store the timestamp of failed executors for each host
+  private val failedExecutorsTimeStampsPerHost = mutable.Map[String, 
mutable.Queue[Long]]()
+
+  private val failedExecutorsTimeStamps = new mutable.Queue[Long]()
+
+  private def getNumFailuresWithinValidityInterval(
+  failedExecutorsTimeStampsForHost: mutable.Queue[Long],
+  endTime: Long): Int = {
+while (executorFailuresValidityInterval > 0
+  && failedExecutorsTimeStampsForHost.nonEmpty
+  && failedExecutorsTimeStampsForHost.head < endTime - 
executorFailuresValidityInterval) {
+  failedExecutorsTimeStampsForHost.dequeue()
--- End diff --

It's counter-intuitive that this `get*` method mutates state. If I called 

getNumFailuresWithinValidityInterval(foo, 0)
getNumFailuresWithinValidityInterval(foo, 10)
getNumFailuresWithinValidityInterval(foo, 0)

The last call can return something different from the first because all the 
failures that weren't within `10 - executorFailuresValidityInterval` will have 
been dropped. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21068: [SPARK-16630][YARN] Blacklist a node if executors...

2018-04-16 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21068#discussion_r181907316
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.util.{Clock, SystemClock, Utils}
+
+private[spark] class YarnAllocatorBlacklistTracker(
+sparkConf: SparkConf,
+amClient: AMRMClient[ContainerRequest],
+failureWithinTimeIntervalTracker: FailureWithinTimeIntervalTracker)
+  extends Logging {
+
+  private val DEFAULT_TIMEOUT = "1h"
+
+  private val BLACKLIST_TIMEOUT_MILLIS =
+
sparkConf.get(BLACKLIST_TIMEOUT_CONF).getOrElse(Utils.timeStringAsMs(DEFAULT_TIMEOUT))
+
+  private val IS_YARN_ALLOCATION_BLACKLIST_ENABLED =
+sparkConf.get(YARN_ALLOCATION_BLACKLIST_ENABLED).getOrElse(false)
+
+  private val BLACKLIST_MAX_FAILED_EXEC_PER_NODE = 
sparkConf.get(MAX_FAILED_EXEC_PER_NODE)
+
+  private val BLACKLIST_SIZE_LIMIT = 
sparkConf.get(YARN_BLACKLIST_SIZE_LIMIT)
+
+  private val BLACKLIST_SIZE_DEFAULT_WEIGHT = 
sparkConf.get(YARN_BLACKLIST_SIZE_DEFAULT_WEIGHT)
+
+  private var clock: Clock = new SystemClock
+
+  private val allocationBlacklistedNodesWithExpiry = new HashMap[String, 
Long]()
+
+  private var currentBlacklistedYarnNodes = Set.empty[String]
+
+  private var schedulerBlacklistedNodesWithExpiry = Map.empty[String, Long]
+
+  private var numClusterNodes = (Int.MaxValue / 
BLACKLIST_SIZE_DEFAULT_WEIGHT).toInt
+
+  def setNumClusterNodes(numClusterNodes: Int): Unit = {
+this.numClusterNodes = numClusterNodes
+  }
+
+  /**
+   * Use a different clock. This is mainly used for testing.
+   */
+  def setClock(newClock: Clock): Unit = {
+clock = newClock
+  }
+
+  def handleResourceAllocationFailure(hostOpt: Option[String]): Unit = {
+hostOpt match {
+  case Some(hostname) =>
+// failures on a already blacklisted nodes are not even tracked
+// otherwise such failures could shutdown the application
+// as resource requests are asynchronous
+// and a late failure response could exceed MAX_EXECUTOR_FAILURES
+if (!schedulerBlacklistedNodesWithExpiry.contains(hostname) &&
+  !allocationBlacklistedNodesWithExpiry.contains(hostname)) {
+  failureWithinTimeIntervalTracker.registerFailureOnHost(hostname)
+  updateAllocationBlacklistedNodes(hostname)
+}
+  case None =>
+failureWithinTimeIntervalTracker.registerExecutorFailure()
+}
+  }
+
+  private def updateAllocationBlacklistedNodes(hostname: String): Unit = {
+if (IS_YARN_ALLOCATION_BLACKLIST_ENABLED) {
--- End diff --

consider just:

if (!IS_YARN_ALLOCATION_BLACKLIST_ENABLED) return;

to save a level of indentation below.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21068: [SPARK-16630][YARN] Blacklist a node if executors...

2018-04-16 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21068#discussion_r181912300
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/FailureWithinTimeIntervalTracker.scala
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.{Clock, SystemClock}
+
+private[spark] class FailureWithinTimeIntervalTracker(sparkConf: 
SparkConf) extends Logging {
+
+  private var clock: Clock = new SystemClock
+
+  private val executorFailuresValidityInterval =
+
sparkConf.get(config.EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L)
+
+  // Queue to store the timestamp of failed executors for each host
+  private val failedExecutorsTimeStampsPerHost = mutable.Map[String, 
mutable.Queue[Long]]()
+
+  private val failedExecutorsTimeStamps = new mutable.Queue[Long]()
+
+  private def getNumFailuresWithinValidityInterval(
--- End diff --

It's not really clear what a 'validity interval' is. I think it means that 
only failures that have happened recently are considered valid? I think it 
would be clearer to call this `getNumFailuresSince()`, or 
`getRecentFailureCount()` or similar, and explicitly pass in the timestamp the 
caller wants to consider failures since. 

If you do the latter, and drop the `endTime` argument, then you partly 
address the issue I raise below about how this mutates state, because 
`getRecentFailureCount()` suggests more clearly that it's expecting to take 
into account the current time. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21068: [SPARK-16630][YARN] Blacklist a node if executors...

2018-04-16 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21068#discussion_r181907395
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.util.{Clock, SystemClock, Utils}
+
+private[spark] class YarnAllocatorBlacklistTracker(
+sparkConf: SparkConf,
+amClient: AMRMClient[ContainerRequest],
+failureWithinTimeIntervalTracker: FailureWithinTimeIntervalTracker)
+  extends Logging {
+
+  private val DEFAULT_TIMEOUT = "1h"
+
+  private val BLACKLIST_TIMEOUT_MILLIS =
+
sparkConf.get(BLACKLIST_TIMEOUT_CONF).getOrElse(Utils.timeStringAsMs(DEFAULT_TIMEOUT))
+
+  private val IS_YARN_ALLOCATION_BLACKLIST_ENABLED =
+sparkConf.get(YARN_ALLOCATION_BLACKLIST_ENABLED).getOrElse(false)
+
+  private val BLACKLIST_MAX_FAILED_EXEC_PER_NODE = 
sparkConf.get(MAX_FAILED_EXEC_PER_NODE)
+
+  private val BLACKLIST_SIZE_LIMIT = 
sparkConf.get(YARN_BLACKLIST_SIZE_LIMIT)
+
+  private val BLACKLIST_SIZE_DEFAULT_WEIGHT = 
sparkConf.get(YARN_BLACKLIST_SIZE_DEFAULT_WEIGHT)
+
+  private var clock: Clock = new SystemClock
+
+  private val allocationBlacklistedNodesWithExpiry = new HashMap[String, 
Long]()
+
+  private var currentBlacklistedYarnNodes = Set.empty[String]
+
+  private var schedulerBlacklistedNodesWithExpiry = Map.empty[String, Long]
+
+  private var numClusterNodes = (Int.MaxValue / 
BLACKLIST_SIZE_DEFAULT_WEIGHT).toInt
+
+  def setNumClusterNodes(numClusterNodes: Int): Unit = {
+this.numClusterNodes = numClusterNodes
+  }
+
+  /**
+   * Use a different clock. This is mainly used for testing.
+   */
+  def setClock(newClock: Clock): Unit = {
+clock = newClock
+  }
+
+  def handleResourceAllocationFailure(hostOpt: Option[String]): Unit = {
+hostOpt match {
+  case Some(hostname) =>
+// failures on a already blacklisted nodes are not even tracked
+// otherwise such failures could shutdown the application
+// as resource requests are asynchronous
+// and a late failure response could exceed MAX_EXECUTOR_FAILURES
+if (!schedulerBlacklistedNodesWithExpiry.contains(hostname) &&
+  !allocationBlacklistedNodesWithExpiry.contains(hostname)) {
+  failureWithinTimeIntervalTracker.registerFailureOnHost(hostname)
+  updateAllocationBlacklistedNodes(hostname)
+}
+  case None =>
+failureWithinTimeIntervalTracker.registerExecutorFailure()
+}
+  }
+
+  private def updateAllocationBlacklistedNodes(hostname: String): Unit = {
+if (IS_YARN_ALLOCATION_BLACKLIST_ENABLED) {
+  val failuresOnHost = 
failureWithinTimeIntervalTracker.getNumExecutorFailuresOnHost(hostname)
+  if (failuresOnHost > BLACKLIST_MAX_FAILED_EXEC_PER_NODE) {
+logInfo("blacklisting host as YARN allocation failed: 
%s".format(hostname))
--- End diff --

log msg could include the number of failures


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-16 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181901031
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +59,159 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putIntsLittleEndian(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putInt(rowId + i, buffer.getInt());
+  }
+}
   }
 
   @Override
   public final void readLongs(int total, WritableColumnVector c, int 
rowId) {
-c.putLongsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putLongsLittleEndian(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putLong(rowId + i, buffer.getLong());
+  }
+}
   }
 
   @Override
   public final void readFloats(int total, WritableColumnVector c, int 
rowId) {
-c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putFloats(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putFloat(rowId + i, buffer.getFloat());
+  }
+}
   }
 
   @Override
   public final void readDoubles(int total, WritableColumnVector c, int 
rowId) {
-c.putDoubles(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putDoubles(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putDouble(rowId + i, buffer.getDouble());
+  }
+}
+  }
+
+  private byte getByte() {
--- End diff --

Is this used anywhere other than line 154? If not, can be inlined.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-16 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181902045
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +59,159 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putIntsLittleEndian(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putInt(rowId + i, buffer.getInt());
+  }
+}
   }
 
   @Override
   public final void readLongs(int total, WritableColumnVector c, int 
rowId) {
-c.putLongsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putLongsLittleEndian(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putLong(rowId + i, buffer.getLong());
+  }
+}
   }
 
   @Override
   public final void readFloats(int total, WritableColumnVector c, int 
rowId) {
-c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putFloats(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putFloat(rowId + i, buffer.getFloat());
+  }
+}
   }
 
   @Override
   public final void readDoubles(int total, WritableColumnVector c, int 
rowId) {
-c.putDoubles(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putDoubles(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putDouble(rowId + i, buffer.getDouble());
+  }
+}
+  }
+
+  private byte getByte() {
+try {
+  return (byte) in.read();
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read a byte", e);
+}
   }
 
   @Override
   public final void readBytes(int total, WritableColumnVector c, int 
rowId) {
-for (int i = 0; i < total; i++) {
-  // Bytes are stored as a 4-byte little endian int. Just read the 
first byte.
-  // TODO: consider pushing this in ColumnVector by adding a readBytes 
with a stride.
-  c.putByte(rowId + i, Platform.getByte(buffer, offset));
-  offset += 4;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
+  c.putByte(rowId + i, buffer.get());
--- End diff --

could you preserve the comment about "Bytes are stored as 4-byte little 
endian int. Just read the first byte."?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-16 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181889287
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +58,139 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
--- End diff --

It seems worth it to me, to be defensive against performance changes - but 
feel free to punt it to me as a follow-on patch if you'd rather.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-16 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181882476
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +58,139 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
--- End diff --

Isn't that what `hasArray()` is for though? If the buffers are backed by a 
byte array, `hasArray()` returns true and accessing the byte array via 
`array()` should be 0 cost. (If `array()` actually copies any data, that would 
invalidate this line of reasoning but would also be unexpected).

So for example, here you'd have:

public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
int requiredBytes = total * 4;
ByteBuffer buffer = getBuffer(requiredBytes);
if (buffer.hasArray()) {
  c.putIntsLittleEndian(rowId, total, buffer.array(), 0); 
} else {
for (int i = 0; i < total; i += 1) {
c.putInt(rowId + i, buffer.getInt());
}
}
}

This seems to be the same pattern that's in `readBinary()`, below. Let me 
know if I'm missing something!



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21072: [SPARK-23973][SQL] Remove consecutive Sorts

2018-04-16 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21072#discussion_r181847445
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantSortsSuite.scala
 ---
@@ -98,4 +98,31 @@ class RemoveRedundantSortsSuite extends PlanTest {
 val correctAnswer = groupedAndResorted.analyze
 comparePlans(optimized, correctAnswer)
   }
+
+  test("remove two consecutive sorts") {
+val orderedTwice = testRelation.orderBy('a.asc).orderBy('b.desc)
+val optimized = Optimize.execute(orderedTwice.analyze)
+val correctAnswer = testRelation.orderBy('b.desc).analyze
+comparePlans(optimized, correctAnswer)
+  }
--- End diff --

Can you add a test for three consecutive sorts? Two is the base case, three 
will help us show the inductive case :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21072: [SPARK-23973][SQL] Remove consecutive Sorts

2018-04-16 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21072#discussion_r181851593
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -736,12 +736,22 @@ object EliminateSorts extends Rule[LogicalPlan] {
 }
 
 /**
- * Removes Sort operation if the child is already sorted
+ * Removes redundant Sort operation. This can happen:
+ * 1) if the child is already sorted
+ * 2) if the there is another Sort operator separated by 0...n 
Project/Filter operators
--- End diff --

nit: 'the there'


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21072: [SPARK-23973][SQL] Remove consecutive Sorts

2018-04-16 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21072#discussion_r181849951
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantSortsSuite.scala
 ---
@@ -98,4 +98,31 @@ class RemoveRedundantSortsSuite extends PlanTest {
 val correctAnswer = groupedAndResorted.analyze
 comparePlans(optimized, correctAnswer)
   }
+
--- End diff --

Could you add a test which explicitly confirms that sort.limit.sort is not 
simplified? I know the above two tests cover that case, but it's good to have 
one dedicated to testing this important property. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-16 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181846514
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +58,139 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
--- End diff --

Agreed that fixing the `ByteBuffer` / `ColumnVector` interaction should be 
dealt with elsewhere. I'm just raising the possibility of _regressing_ the read 
path here because the copies are less efficient. Since it's going to be a while 
before 2.4.0, that might be ok if we commit to fixing it - but it superficially 
seems like a manageable change to the PR since the code to call the bulk APIs 
is already there.  What do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21049: [SPARK-23957][SQL] Remove redundant sort operator...

2018-04-16 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21049#discussion_r181839715
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -307,6 +309,32 @@ object RemoveRedundantProject extends 
Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove [[Sort]] in subqueries that do not affect the set of rows 
produced, only their
+ * order. Subqueries produce unordered sets of rows so sorting their 
output is unnecessary.
+ */
+object RemoveSubquerySorts extends Rule[LogicalPlan] {
+
+  /**
+   * Removes all [[Sort]] operators from a plan that are accessible from 
the root operator via
+   * 0 or more [[Project]], [[Filter]] or [[View]] operators.
+   */
+  private def removeTopLevelSorts(plan: LogicalPlan): LogicalPlan = {
+plan match {
+  case Sort(_, _, child) => removeTopLevelSorts(child)
+  case Project(fields, child) => Project(fields, 
removeTopLevelSorts(child))
+  case Filter(condition, child) => Filter(condition, 
removeTopLevelSorts(child))
+  case View(tbl, output, child) => View(tbl, output, 
removeTopLevelSorts(child))
+  case _ => plan
+}
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case Subquery(child) => Subquery(removeTopLevelSorts(child))
+case SubqueryAlias(name, child) => SubqueryAlias(name, 
removeTopLevelSorts(child))
--- End diff --

Yep, that's why I added the new rule just before `EliminateSubqueryAliases` 
(which runs in the optimizer, as part of the 'finish analysis' batch). After 
`EliminateSubqueryAliases` there doesn't seem to be any way to detect 
subqueries.

Another approach I suppose would be to handle this like `SparkPlan`'s 
`requiredChildOrdering` - if a parent doesn't require any ordering of the 
child, (and the child is a `Sort` node), the child `Sort` should be dropped. 
That seems like a more fundamental change though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21049: [SPARK-23957][SQL] Remove redundant sort operators from ...

2018-04-16 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/21049
  
In SQL, the sort in a subquery doesn't make sense because of the relational 
model - the output of a subquery is an unordered bag of tuples. Some engines 
still allow the sort, some silently drop it and some throw an error.

For example: 

* MariaDB: 
https://mariadb.com/kb/en/library/why-is-order-by-in-a-from-subquery-ignored/
* SQL Server: 
https://stackoverflow.com/questions/985921/sql-error-with-order-by-in-subquery
 
Oracle and Postgres allow the `ORDER BY`.

One issue might be that the underlying dataframe model might not be 100% 
relational - maybe dataframes _are_ sorted lists of rows and then this 
optimization would only be valid if using the SQL interface. If so, it's 
probably not worth the effort to maintain. But if dataframes and SQL relations 
are supposed to be equivalent, we can drop the `ORDER BY`.

We also may want to decide not to do this because it would surprise users 
who had been relying on the existing behavior.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21072: [SPARK-23973][SQL] Remove consecutive Sorts

2018-04-14 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21072#discussion_r181563918
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -736,12 +736,15 @@ object EliminateSorts extends Rule[LogicalPlan] {
 }
 
 /**
- * Removes Sort operation if the child is already sorted
+ * Removes redundant Sort operation. This can happen:
+ * 1) if the child is already sorted
+ * 2) if the next operator is a Sort itself
  */
 object RemoveRedundantSorts extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
 case Sort(orders, true, child) if 
SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
   child
+case s @ Sort(_, _, Sort(_, _, child)) => s.copy(child = child)
--- End diff --

Thanks for doing this! It might be useful to generalise this to any pair of 
sorts separated by 0 or more projections or filters. I did this for my 
SPARK-23975 PR, see: 
https://github.com/henryr/spark/commit/bb992c2058863322a9183b2985806a87729e4168#diff-a636a87d8843eeccca90140be91d4fafR322

What do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.

2018-04-13 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181540952
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +58,139 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
--- End diff --

Couldn't it also be writing to an `OffHeapColumnVector`? 
https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java#L199

If so, I think the copy is 1MB at a time: 
https://github.com/apache/spark/blob/master/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java#L189

I agree that ByteBuffer shouldn't be supported in this PR. But there's an 
opportunity to use the bulk copy APIs which would benefit from any future 
optimization that happens. Plus even if the copy does eventually become a loop 
inside the column vector implementation, there's more chance of the JIT 
unrolling the loop since it's smaller.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.

2018-04-13 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181528729
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +58,139 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
--- End diff --

Here and elsewhere a bulk copy has been replaced by many smaller copies. It 
would be better to be able to use the bulk version. I think it would be 
preferable to at least have:

if (buffer.hasArray()) { 
  c.putIntsLittleEndian(rowId, total, buffer.array(), 0); 
} else {
  for (int i = 0 // ... etc
} 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-12 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r181253369
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Removes Sort operation if the child is already sorted
+ */
+object RemoveRedundantSorts extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case Sort(orders, true, child) if child.outputOrdering.nonEmpty
+&& SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
+  child
+  }
--- End diff --

Filed SPARK-23973 for this


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21049: [SPARK-23957][SQL] Remove redundant sort operator...

2018-04-11 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21049#discussion_r180964957
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -307,6 +309,32 @@ object RemoveRedundantProject extends 
Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove [[Sort]] in subqueries that do not affect the set of rows 
produced, only their
+ * order. Subqueries produce unordered sets of rows so sorting their 
output is unnecessary.
+ */
+object RemoveSubquerySorts extends Rule[LogicalPlan] {
+
+  /**
+   * Removes all [[Sort]] operators from a plan that are accessible from 
the root operator via
+   * 0 or more [[Project]], [[Filter]] or [[View]] operators.
+   */
+  private def removeTopLevelSorts(plan: LogicalPlan): LogicalPlan = {
+plan match {
+  case Sort(_, _, child) => removeTopLevelSorts(child)
+  case Project(fields, child) => Project(fields, 
removeTopLevelSorts(child))
+  case Filter(condition, child) => Filter(condition, 
removeTopLevelSorts(child))
+  case View(tbl, output, child) => View(tbl, output, 
removeTopLevelSorts(child))
+  case _ => plan
+}
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case Subquery(child) => Subquery(removeTopLevelSorts(child))
+case SubqueryAlias(name, child) => SubqueryAlias(name, 
removeTopLevelSorts(child))
--- End diff --

Thanks! I've been trying to understand the role of `Subquery` and 
`SubqueryAlias`. My confusion is that subqueries do seem to get planned as 
`SubqueryAlias` operators, e.g.:

scala> spark.sql("SELECT count(*) from (SELECT id FROM dft ORDER BY 
id)").explain(true)
== Parsed Logical Plan ==
'Project [unresolvedalias('count(1), None)]
+- 'SubqueryAlias __auto_generated_subquery_name
   +- 'Sort ['id ASC NULLS FIRST], true
  +- 'Project ['id]
 +- 'UnresolvedRelation `dft`

In the example you give I (personally) think it's still reasonable to drop 
the ordering, but understand that might surprise some users. It wouldn't be 
hard to skip the root if it's a subquery - but what do you propose for 
detecting subqueries if my method isn't right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21049: [SPARK-23957][SQL] Remove redundant sort operator...

2018-04-11 Thread henryr
GitHub user henryr opened a pull request:

https://github.com/apache/spark/pull/21049

[SPARK-23957][SQL] Remove redundant sort operators from subqueries

## What changes were proposed in this pull request?

Subqueries (at least in SQL) have 'bag of tuples' semantics. Ordering
them is therefore redundant (unless combined with a limit). This patch
adds a new optimizer rule that removes sort operators that are directly
below subqueries (or some combination of projection and filtering below
a subquery).

## How was this patch tested?

New unit tests. All sql unit tests pass.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/henryr/spark spark-23957

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21049.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21049


commit bb992c2058863322a9183b2985806a87729e4168
Author: Henry Robinson <henry@...>
Date:   2018-04-12T03:44:36Z

[SPARK-23957][SQL] Remove redundant sort operators from subqueries

## What changes were proposed in this pull request?

Subqueries (at least in SQL) have 'bag of tuples' semantics. Ordering
them is therefore redundant (unless combined with a limit). This patch
adds a new optimizer rule that removes sort operators that are directly
below subqueries (or some combination of projection and filtering below
a subquery).

## How was this patch tested?

New unit tests. All sql unit tests pass.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-10 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r179003707
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 ---
@@ -274,3 +279,7 @@ abstract class BinaryNode extends LogicalPlan {
 
   override final def children: Seq[LogicalPlan] = Seq(left, right)
 }
+
+abstract class KeepOrderUnaryNode extends UnaryNode {
--- End diff --

`OrderPreservingUnaryNode`? Or perhaps do you think this would be better 
modeled as a mixin trait?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-10 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r180601594
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Removes Sort operation if the child is already sorted
+ */
+object RemoveRedundantSorts extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case Sort(orders, true, child) if child.outputOrdering.nonEmpty
+&& SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
+  child
+  }
--- End diff --

You might not want to do it in this PR, but you could easily remove another 
simple kind of redundant sort, e.g.:

`rel.orderBy('a.desc).orderBy('a.asc)`

(and I think that `orderBy` is not stable, so any two consecutive `orderBy` 
operators are redundant). 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-10 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r180592716
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
@@ -22,10 +22,11 @@ import org.apache.spark.sql.{execution, Row}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, 
LeftOuter, RightOuter}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
Repartition}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
Repartition, Sort}
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
-import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ReusedExchangeExec, ReuseExchange, ShuffleExchangeExec}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ReusedExchangeExec, ReuseExchange,
+  ShuffleExchangeExec}
--- End diff --

revert this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20687: [SPARK-23500][SQL] Fix complex type simplification rules...

2018-03-19 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/20687
  
@gatorsmile ok, I think the coverage right now is a reasonable start - the 
other test cases I can think of would act more like they're exercising the 
expression-walking code, not the actual simplification. Look forward to 
collaborating on the follow-up PR. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20687: [SPARK-23500][SQL] Fix complex type simplification rules...

2018-03-18 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/20687
  
@gatorsmile thank you for the reviews! Are there specific test cases you'd 
like to see? I've checked correlated and uncorrelated subqueries, various 
flavours of join, aggregates with HAVING clauses, nested compound types, and so 
on. 



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20687: [SPARK-23500][SQL] Fix complex type simplificatio...

2018-03-14 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20687#discussion_r174637789
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ComplexTypes.scala
 ---
@@ -22,54 +22,34 @@ import 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 
 /**
-* push down operations into [[CreateNamedStructLike]].
-*/
-object SimplifyCreateStructOps extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-plan.transformExpressionsUp {
-  // push down field extraction
+ * Simplify redundant [[CreateNamedStructLike]], [[CreateArray]] and 
[[CreateMap]] expressions.
+ */
+object SimplifyExtractValueOps extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transform { 
case p =>
+p.transformExpressionsUp {
--- End diff --

FWIW I think this is a particular example of a more general problem where 
expression simplification can break the correspondence between a select 
expression and its grouping equivalent.

Here's a simpler example:

`SELECT (a + b) - a FROM t GROUP BY a + b`

gets me the following:

`org.apache.spark.sql.AnalysisException: expression 't.`b`' is neither 
present in the group by, nor is it an aggregate function. Add to group by or 
wrap in first() (or first_value) if you don't care which value you get.`

Postgres also has this problem, at least in 9.3: `ERROR: column "t.a" must 
appear in the GROUP BY clause or be used in an aggregate function Position: 23`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20687: [SPARK-23500][SQL] Fix complex type simplificatio...

2018-03-09 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20687#discussion_r173561557
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ComplexTypes.scala
 ---
@@ -22,32 +22,24 @@ import 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 
 /**
-* push down operations into [[CreateNamedStructLike]].
+* Simplify redundant [[CreateNamedStructLike]], [[CreateArray]] and 
[[CreateMap]] expressions.
 */
-object SimplifyCreateStructOps extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-plan.transformExpressionsUp {
-  // push down field extraction
+object SimplifyExtractValueOps extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transform { 
case p =>
+p.transformExpressionsUp {
+  // Remove redundant field extraction.
   case GetStructField(createNamedStructLike: CreateNamedStructLike, 
ordinal, _) =>
 createNamedStructLike.valExprs(ordinal)
-}
-  }
-}
 
-/**
-* push down operations into [[CreateArray]].
-*/
-object SimplifyCreateArrayOps extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-plan.transformExpressionsUp {
-  // push down field selection (array of structs)
+  // Remove redundant array indexing.
   case GetArrayStructFields(CreateArray(elems), field, ordinal, 
numFields, containsNull) =>
 // instead f selecting the field on the entire array,
 // select it from each member of the array.
 // pushing down the operation this way open other optimizations 
opportunities
 // (i.e. struct(...,x,...).x)
 CreateArray(elems.map(GetStructField(_, ordinal, 
Some(field.name
-  // push down item selection.
+
+  // Remove redundant map lookup.
   case ga @ GetArrayItem(CreateArray(elems), IntegerLiteral(idx)) =>
 // instead of creating the array and then selecting one row,
 // remove array creation altgether.
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20687: [SPARK-23500][SQL] Fix complex type simplificatio...

2018-03-09 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20687#discussion_r173561516
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ComplexTypes.scala
 ---
@@ -22,32 +22,24 @@ import 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 
 /**
-* push down operations into [[CreateNamedStructLike]].
+* Simplify redundant [[CreateNamedStructLike]], [[CreateArray]] and 
[[CreateMap]] expressions.
 */
-object SimplifyCreateStructOps extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-plan.transformExpressionsUp {
-  // push down field extraction
+object SimplifyExtractValueOps extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transform { 
case p =>
+p.transformExpressionsUp {
+  // Remove redundant field extraction.
   case GetStructField(createNamedStructLike: CreateNamedStructLike, 
ordinal, _) =>
 createNamedStructLike.valExprs(ordinal)
-}
-  }
-}
 
-/**
-* push down operations into [[CreateArray]].
-*/
-object SimplifyCreateArrayOps extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-plan.transformExpressionsUp {
-  // push down field selection (array of structs)
+  // Remove redundant array indexing.
   case GetArrayStructFields(CreateArray(elems), field, ordinal, 
numFields, containsNull) =>
--- End diff --

Done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20687: [SPARK-23500][SQL] Fix complex type simplificatio...

2018-03-09 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20687#discussion_r173561415
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ComplexTypes.scala
 ---
@@ -22,32 +22,24 @@ import 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 
 /**
-* push down operations into [[CreateNamedStructLike]].
+* Simplify redundant [[CreateNamedStructLike]], [[CreateArray]] and 
[[CreateMap]] expressions.
 */
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20687: [SPARK-23500][SQL] Fix complex type simplificatio...

2018-03-08 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20687#discussion_r173323846
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala
 ---
@@ -331,4 +330,31 @@ class ComplexTypesSuite extends PlanTest with 
ExpressionEvalHelper {
   .analyze
 comparePlans(Optimizer execute rel, expected)
   }
+
+  test("SPARK-23500: Simplify complex ops that aren't at the plan root") {
+val structRel = relation
+  .select(GetStructField(CreateNamedStruct(Seq("att1", 'nullable_id)), 
0, None) as "foo")
+  .groupBy($"foo")("1").analyze
+val structExpected = relation
+  .select('nullable_id as "foo")
+  .groupBy($"foo")("1").analyze
+comparePlans(Optimizer execute structRel, structExpected)
+
+// If nullable attributes aren't used in the 'expected' plans, the 
array and map test
+// cases fail because array and map indexing can return null so the 
output attribute
--- End diff --

Done, thanks. I filed SPARK-23634 to fix this. Out of interest, why does 
`AttributeReference` cache the nullability of its referent? Is it because 
comparison is too expensive to do if you have to follow a level of indirection 
to get to the original attribute? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20687: [SPARK-23500][SQL] Fix complex type simplificatio...

2018-03-08 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20687#discussion_r173268999
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala
 ---
@@ -331,4 +330,31 @@ class ComplexTypesSuite extends PlanTest with 
ExpressionEvalHelper {
   .analyze
 comparePlans(Optimizer execute rel, expected)
   }
+
+  test("SPARK-23500: Simplify complex ops that aren't at the plan root") {
+val structRel = relation
+  .select(GetStructField(CreateNamedStruct(Seq("att1", 'nullable_id)), 
0, None) as "foo")
+  .groupBy($"foo")("1").analyze
+val structExpected = relation
+  .select('nullable_id as "foo")
+  .groupBy($"foo")("1").analyze
+comparePlans(Optimizer execute structRel, structExpected)
+
+// If nullable attributes aren't used in the 'expected' plans, the 
array and map test
+// cases fail because array and map indexing can return null so the 
output attribute
--- End diff --

@cloud-fan I looked again at this briefly this morning. The issue is that 
it's the `AttributeReference` in the top-level `Aggregate`'s 
`groupingExpressions` that has inconsistent nullability. 

The `AttributeReference` in the original plan was originally created with 
`nullable=true`, before optimization. So at that point it's kind of fixed 
unless the optimizer dereferences the attr reference and realises that the 
target is no longer nullable. 



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20687: [SPARK-23500][SQL] Fix complex type simplificatio...

2018-03-07 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20687#discussion_r173013094
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala
 ---
@@ -331,4 +330,31 @@ class ComplexTypesSuite extends PlanTest with 
ExpressionEvalHelper {
   .analyze
 comparePlans(Optimizer execute rel, expected)
   }
+
+  test("SPARK-23500: Simplify complex ops that aren't at the plan root") {
+val structRel = relation
+  .select(GetStructField(CreateNamedStruct(Seq("att1", 'nullable_id)), 
0, None) as "foo")
+  .groupBy($"foo")("1").analyze
+val structExpected = relation
+  .select('nullable_id as "foo")
+  .groupBy($"foo")("1").analyze
+comparePlans(Optimizer execute structRel, structExpected)
+
+// If nullable attributes aren't used in the 'expected' plans, the 
array and map test
+// cases fail because array and map indexing can return null so the 
output attribute
--- End diff --

Thanks, that's plenty of information to get started - I'll dig into it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20687: [SPARK-23500][SQL] Fix complex type simplificatio...

2018-03-07 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20687#discussion_r173007679
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala
 ---
@@ -331,4 +330,31 @@ class ComplexTypesSuite extends PlanTest with 
ExpressionEvalHelper {
   .analyze
 comparePlans(Optimizer execute rel, expected)
   }
+
+  test("SPARK-23500: Simplify complex ops that aren't at the plan root") {
+val structRel = relation
+  .select(GetStructField(CreateNamedStruct(Seq("att1", 'nullable_id)), 
0, None) as "foo")
+  .groupBy($"foo")("1").analyze
+val structExpected = relation
+  .select('nullable_id as "foo")
+  .groupBy($"foo")("1").analyze
+comparePlans(Optimizer execute structRel, structExpected)
+
+// If nullable attributes aren't used in the 'expected' plans, the 
array and map test
+// cases fail because array and map indexing can return null so the 
output attribute
--- End diff --

It's a good question! I'm not too familiar with how nullability is marked 
and unmarked during planning. My understanding is roughly that the analyzer 
resolves all the plan's expressions and in doing so marks attributes as 
nullable or not. After that it's not clear that the optimizer revisits any of 
those nullability decisions. Is there an optimizer pass which should make 
nullability marking more precise? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20687: [SPARK-23500][SQL] Fix complex type simplificatio...

2018-03-07 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20687#discussion_r172996211
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ComplexTypes.scala
 ---
@@ -22,32 +22,24 @@ import 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 
 /**
-* push down operations into [[CreateNamedStructLike]].
+* Simplify redundant [[CreateNamedStructLike]], [[CreateArray]] and 
[[CreateMap]] expressions.
 */
 object SimplifyCreateStructOps extends Rule[LogicalPlan] {
--- End diff --

Good point, done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20687: [SPARK-23500][SQL] Fix complex type simplificatio...

2018-03-07 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20687#discussion_r172992262
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala
 ---
@@ -331,4 +330,30 @@ class ComplexTypesSuite extends PlanTest with 
ExpressionEvalHelper {
   .analyze
 comparePlans(Optimizer execute rel, expected)
   }
+
+  test("SPARK-23500: Simplify complex ops that aren't at the plan root") {
+// If nullable attributes aren't used, the array and map test cases 
fail because array
+// and map indexing can return null so the output is marked nullable.
--- End diff --

The optimization works either way, but in (for example) the map case, `m1` 
is marked as nullable in the original plan because presumably 
`GetMapValue(CreateMap(...))` can return `null` if the key is not in the map. 

So for the expected plan to compare the same as the original, it has to be 
reading a nullable attribute - otherwise the plans don't pass `comparePlans`.  
I moved and reworded the comment to hopefully clarify this a bit.

There's an opportunity to fix this up again after the rule completes (since 
some attributes could be marked too conservatively as nullable). Do you think 
that's something we should pursue for this PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20687: [SPARK-23500][SQL] Fix complex type simplification rules...

2018-03-06 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/20687
  
This failing because of SPARK-23606, which seems unrelated (I haven't been 
able to trigger it in local builds, at least). 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20687: [SPARK-23500][SQL] Fix complex type simplification rules...

2018-03-06 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/20687
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20740: [SPARK-23604][SQL] Change Statistics.isEmpty to !Statist...

2018-03-05 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/20740
  
argh, wrong PR, sorry for retest-spam.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20740: [SPARK-23604][SQL] Change Statistics.isEmpty to !Statist...

2018-03-05 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/20740
  
Retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20740: [SPARK-23604][SQL] Change Statistics.isEmpty to !...

2018-03-05 Thread henryr
GitHub user henryr opened a pull request:

https://github.com/apache/spark/pull/20740

[SPARK-23604][SQL] Change Statistics.isEmpty to !Statistics.hasNonNul…

…lValue

## What changes were proposed in this pull request?

Parquet 1.9 will change the semantics of Statistics.isEmpty slightly
to reflect if the null value count has been set. That breaks a
timestamp interoperability test that cares only about whether there
are column values present in the statistics of a written file for an
INT96 column. Fix by using Statistics.hasNonNullValue instead.

## How was this patch tested?

Unit tests continue to pass against Parquet 1.8, and also pass against
a Parquet build including PARQUET-1217.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/henryr/spark spark-23604

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20740.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20740


commit b86c0bee16f85ad1f7d4e3351647ee5d0582d005
Author: Henry Robinson <henry@...>
Date:   2018-03-05T20:59:53Z

[SPARK-23604][SQL] Change Statistics.isEmpty to !Statistics.hasNonNullValue

## What changes were proposed in this pull request?

Parquet 1.9 will change the semantics of Statistics.isEmpty slightly
to reflect if the null value count has been set. That breaks a
timestamp interoperability test that cares only about whether there
are column values present in the statistics of a written file for an
INT96 column. Fix by using Statistics.hasNonNullValue instead.

## How was this patch tested?

Unit tests continue to pass against Parquet 1.8, and also pass against
a Parquet build including PARQUET-1217.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20687: [SPARK-25000][SQL] Fix complex type simplificatio...

2018-03-05 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20687#discussion_r172300096
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala
 ---
@@ -331,4 +331,24 @@ class ComplexTypesSuite extends PlanTest with 
ExpressionEvalHelper {
   .analyze
 comparePlans(Optimizer execute rel, expected)
   }
+
+  test("SPARK-23500: Simplify complex ops that aren't at the plan root") {
+val structRel = relation
+  .select(GetStructField(CreateNamedStruct(Seq("att1", 'id)), 0, None) 
as "foo")
+  .select('foo).analyze
--- End diff --

Thanks for the pointer. I replaced the projection with an aggregation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20687: [SPARK-25000][SQL] Fix complex type simplification rules...

2018-02-28 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/20687
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20687: [SPARK-25000][SQL] Fix complex type simplificatio...

2018-02-27 Thread henryr
GitHub user henryr opened a pull request:

https://github.com/apache/spark/pull/20687

[SPARK-25000][SQL] Fix complex type simplification rules to apply to entire 
plan

## What changes were proposed in this pull request?

Complex type simplification optimizer rules were not applied to the
entire plan, just the expressions reachable from the root node. This
patch fixes the rules to transform the entire plan.

## How was this patch tested?

New unit test + ran sql / core tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/henryr/spark spark-25000

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20687.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20687


commit f446fa2482f2ccdfa6fec7243bb25b2e232da5fa
Author: Henry Robinson <henry@...>
Date:   2018-02-28T00:42:17Z

[SPARK-25000][SQL] Fix complex type simplification rules to apply to entire 
plan

## What changes were proposed in this pull request?

Complex type simplification optimizer rules were not applied to the
entire plan, just the expressions reachable from the root node. This
patch fixes the rules to transform the entire plan.

## How was this patch tested?

New unit test + sql / core tests.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20443: [SPARK-23157][SQL][FOLLOW-UP] DataFrame -> SparkD...

2018-01-30 Thread henryr
GitHub user henryr opened a pull request:

https://github.com/apache/spark/pull/20443

[SPARK-23157][SQL][FOLLOW-UP] DataFrame -> SparkDataFrame in R comment



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/henryr/spark SPARK-23157

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20443.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20443


commit 2441e030a5d20ceb6b1c0068e0adf5586c254bb8
Author: Henry Robinson <henry@...>
Date:   2018-01-30T22:49:36Z

[SPARK-23157][SQL][FOLLOW-UP] DataFrame -> SparkDataFrame in R comment




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20429: [SPARK-23157][SQL] Explain restriction on column ...

2018-01-29 Thread henryr
GitHub user henryr opened a pull request:

https://github.com/apache/spark/pull/20429

[SPARK-23157][SQL] Explain restriction on column expression in withColumn()

## What changes were proposed in this pull request?

It's not obvious from the comments that any added column must be a
function of the dataset that we are adding it to. Add a comment to
that effect to Scala, Python and R Data* methods.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/henryr/spark SPARK-23157

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20429.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20429


commit 18f30863a67e1ad49be87765badab69647e45d7a
Author: Henry Robinson <henry@...>
Date:   2018-01-29T21:51:00Z

[SPARK-23157][SQL] Explain restriction on column expression in withColumn()

It's not obvious from the comments that any added column must be a
function of the dataset that we are adding it to. Add a comment to
that effect to Scala, Python and R Data* methods.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20355: SPARK-23148: [SQL] Allow pathnames with special c...

2018-01-23 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20355#discussion_r163455727
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala ---
@@ -78,4 +78,20 @@ class FileBasedDataSourceSuite extends QueryTest with 
SharedSQLContext {
   }
 }
   }
+
+  // Separate test case for text-based formats that support multiLine as 
an option.
+  Seq("json", "csv", "text").foreach { format =>
--- End diff --

That sounds good - my main concern is to make sure that both 
`multiLine=true` _and_ `multiLine=false` have coverage with a space in the 
name, since they are such different paths. I'll keep the change that adds a 
space to `nameWithSpecialChars`, but otherwise have the tests as you suggest - 
let me know what you think of the next patch!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20355: SPARK-23148: [SQL] Allow pathnames with special c...

2018-01-23 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20355#discussion_r163422934
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala ---
@@ -68,13 +68,16 @@ class FileBasedDataSourceSuite extends QueryTest with 
SharedSQLContext {
   }
 
   allFileBasedDataSources.foreach { format =>
-test(s"SPARK-22146 read files containing special characters using 
$format") {
-  val nameWithSpecialChars = s"sp%chars"
-  withTempDir { dir =>
-val tmpFile = s"$dir/$nameWithSpecialChars"
-spark.createDataset(Seq("a", 
"b")).write.format(format).save(tmpFile)
-val fileContent = spark.read.format(format).load(tmpFile)
-checkAnswer(fileContent, Seq(Row("a"), Row("b")))
+test(s"SPARK-22146 / SPARK-23148 read files containing special 
characters using $format") {
+  val nameWithSpecialChars = s"sp%c hars"
+  Seq(true, false).foreach { multiline =>
--- End diff --

Sounds good to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20355: SPARK-23148: [SQL] Allow pathnames with special c...

2018-01-23 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20355#discussion_r163411267
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
 ---
@@ -172,6 +172,14 @@ class TextSuite extends QueryTest with 
SharedSQLContext {
 }
   }
 
+  test("SPARK-23148: test for spaces in file names") {
--- End diff --

In the end, to reduce code duplication, I made it so that orc and parquet 
run multiline as well (I tried to find a neat way to only run multiline if the 
format was csv, text or json without having a separate test case but it just 
complicated things). Let me know if you'd rather I have two separate test cases 
to avoid running the two redundant cases with orc / parquet.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20355: SPARK-23148: [SQL] Allow pathnames with special c...

2018-01-22 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20355#discussion_r163146077
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
 ---
@@ -172,6 +172,14 @@ class TextSuite extends QueryTest with 
SharedSQLContext {
 }
   }
 
+  test("SPARK-23148: test for spaces in file names") {
--- End diff --

@HyukjinKwon good idea, thanks for pointing out that test - how about I 
just add a space to the special characters string, and have the existing test 
also test multiline on/off for text, csv and json?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20355: SPARK-23148: [SQL] Allow pathnames with special c...

2018-01-22 Thread henryr
GitHub user henryr opened a pull request:

https://github.com/apache/spark/pull/20355

SPARK-23148: [SQL] Allow pathnames with special characters for CSV / …

…JSON / text

## What changes were proposed in this pull request?

Fix for JSON and CSV data sources when file names include characters
that would be changed by URL encoding.

## How was this patch tested?

New unit tests for JSON, CSV and text suites


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/henryr/spark spark-23148

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20355.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20355


commit 98da4cd4b865ef8944a10a24ee709ed9ee0a4721
Author: Henry Robinson <henry@...>
Date:   2018-01-19T22:55:53Z

SPARK-23148: [SQL] Allow pathnames with special characters for CSV / JSON / 
text

## What changes were proposed in this pull request?

Fix for JSON and CSV data sources when file names include characters
that would be changed by URL encoding.

## How was this patch tested?

New unit tests for JSON, CSV and text suites




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20254: [SPARK-23062][SQL] Improve EXCEPT documentation

2018-01-16 Thread henryr
Github user henryr commented on the issue:

https://github.com/apache/spark/pull/20254
  
Thanks all for the pointers and feedback! I've removed the references to 
the behavior before 2.0, and now the changes just make it explicit that this is 
`EXCEPT DISTINCT` (I appreciate that that's the meaning of `EXCEPT` per ANSI, 
but the behavior change since 1.x has confused users I've spoken to so seems 
worthwhile to make the documentation as clear as possible).



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >