[GitHub] spark issue #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts, ints

2018-12-04 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/23171
  
As @rxin said, if we introduce a separate expression for the switch-based 
approach, then we will need to modify other places. For example, 
`DataSourceStrategy$translateFilter`. So, integrating into `In` or `InSet` 
seems safer.

I think we can move the switch-based logic to `InSet` and make `InSet` 
responsible for picking the most optimal execution path. We might need to 
modify the condition when we convert `In` to `InSet` as this will most likely 
depend on the underlying data type. I saw noticeable improvements starting from 
5 elements when you compare the if-else approach to the switch-based one. Right 
now, the conversion happens for more than 10 elements.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts, ints

2018-11-29 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/23171
  
To sum up, I would set the goal of this PR is to make `In` expressions as 
efficient as possible for bytes/shorts/ints. Then we can do benchmarks for `In` 
vs `InSet` in [SPARK-26203](https://issues.apache.org/jira/browse/SPARK-26203) 
and try to come up with a solution for `InSet` in 
[SPARK-26204](https://issues.apache.org/jira/browse/SPARK-26204). By the time 
we solve [SPARK-26204](https://issues.apache.org/jira/browse/SPARK-26204), we 
will have a clear undestanding of pros and cons in `In` and `InSet` and would 
be able to determine the right thresholds.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts, ints

2018-11-29 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/23171
  
@dbtsai @mgaido91 I think we can come back to this question once 
[SPARK-26203](https://issues.apache.org/jira/browse/SPARK-26203) is resolved. 
That JIRA will give us enough information about each data type.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts, ints

2018-11-29 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/23171
  
@cloud-fan, yeah, let’s see if this PR is useful.

The original idea wasn’t to avoid fixing autoboxing in `InSet`. `In` was 
tested on 250 numbers to prove O(1) time complexity on compact values and 
outline problems with `InSet`. After this change, `In` will be faster than 
`InSet` but this is not the goal. Overall, the intent was to have a tiny and 
straightforward change that would optimize `In` expressions even if the number 
of elements is less than “spark.sql.optimizer.inSetConversionThreshold” and 
Spark does not use `InSet`.

Once we solve autoboxing issues in `InSet`, we would need to benchmark 
against this approach. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts, ints

2018-11-28 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/23171
  
@gatorsmile @cloud-fan @dongjoon-hyun @viirya 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts,...

2018-11-28 Thread aokolnychyi
GitHub user aokolnychyi opened a pull request:

https://github.com/apache/spark/pull/23171

[SPARK-26205][SQL] Optimize In for bytes, shorts, ints

## What changes were proposed in this pull request?

This PR optimizes `In` expressions for byte, short, integer types. It is a 
follow-up on PR #21442 from @dbtsai.  

Currently, `In` expressions are compiled into a sequence of if-else 
statement, which results in O(n) time complexity. `InSet` is an optimized 
version of `In`, which is supposed to improve the performance if the number of 
elements is big enough. However, `InSet` actually degrades the performance in 
many cases due to various reasons (`InSet` will be handled in 
[SPARK-26203](https://issues.apache.org/jira/browse/SPARK-26203) and 
[SPARK-26204](https://issues.apache.org/jira/browse/SPARK-26204)).

The main idea of this PR is to make use of `tableswitch` and `lookupswitch` 
bytecode instructions to speed up `In` expressions. In short, we can improve 
our time complexity from O(n) to O(1) or at least O(log n) by using Java 
`switch` statements. We will have O(1) time complexity if our case values are 
compact and `tableswitch` can be used. Otherwise, `lookupswitch` will give us 
O(log n). See 
[here](https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-3.html#jvms-3.10)
 and 
[here](https://stackoverflow.com/questions/10287700/difference-between-jvms-lookupswitch-and-tableswitch)
 for more information.

An important benefit of the proposed approach is that we do not have to pay 
an extra cost for autoboxing as in case of `InSet`. As a consequence, we can 
substantially outperform `InSet` even on 250+ elements.

This PR does not cover long values as Java `switch` statements cannot be 
used on them. However, we can have a follow-up PR with an approach similar to 
binary search.

## How was this patch tested?

### Correctness

There is a new test that verifies the logic of the proposed optimization.

### Performance Benchmarks

The performance impact was measured by the benchmarks below (an appropriate 
part of them will be included in 
[SPARK-26203](https://issues.apache.org/jira/browse/SPARK-26203)).

```
  def compactBytesBenchmark(numItems: Int, numRows: Long, minNumIters: 
Int): Benchmark = {
val name = s"$numItems compact bytes"
val values = (1 to numItems).map(v => s"CAST($v AS tinyint)")
val df = spark.range(1, numRows).select($"id".cast(ByteType))
benchmark(name, df, values, numRows, minNumIters)
  }

  def nonCompactBytesBenchmark(numItems: Int, numRows: Long, minNumIters: 
Int): Benchmark = {
val step = (Byte.MaxValue.toInt - Byte.MinValue.toInt) / numItems
val maxValue = Byte.MinValue + numItems * step
val rangeSize = maxValue - Byte.MinValue
require(isLookupSwitch(rangeSize, numItems))
val name = s"$numItems non-compact bytes"
val values = (Byte.MinValue until maxValue by step).map(v => s"CAST($v 
AS tinyint)")
val df = spark.range(1, numRows).select($"id".cast(ByteType))
benchmark(name, df, values, numRows, minNumIters)
  }

  def compactShortsBenchmark(numItems: Int, numRows: Long, minNumIters: 
Int): Benchmark = {
val name = s"$numItems compact shorts"
val values = (1 to numItems).map(v => s"CAST($v AS smallint)")
val df = spark.range(1, numRows).select($"id".cast(ShortType))
benchmark(name, df, values, numRows, minNumIters)
  }

  def nonCompactShortsBenchmark(numItems: Int, numRows: Long, minNumIters: 
Int): Benchmark = {
val step = (Short.MaxValue.toInt - Short.MinValue.toInt) / numItems
val maxValue = Short.MinValue + numItems * step
val rangeSize = maxValue - Short.MinValue
require(isLookupSwitch(rangeSize, numItems))
val name = s"$numItems non-compact shorts"
val values = (Short.MinValue until maxValue by step).map(v => s"CAST($v 
AS smallint)")
val df = spark.range(1, numRows).select($"id".cast(ShortType))
benchmark(name, df, values, numRows, minNumIters)
  }

  def compactIntsBenchmark(numItems: Int, numRows: Long, minNumIters: Int): 
Benchmark = {
val name = s"$numItems compact ints"
val values = (1 to numItems).map(v => 1000 + v)
val df = spark.range(1, numRows).select($"id".cast(IntegerType))
benchmark(name, df, values, numRows, minNumIters)
  }

  def nonCompactIntsBenchmark(numItems: Int, numRows: Long, minNumIters: 
Int): Benchmark = {
val step = (Int.MaxValue.toLong - Int.MinValue.toLong) / numItems
val maxValue = Int.MinValue + numItems * step
val rangeSize = maxValue - Int.MinValue
require

[GitHub] spark pull request #23139: [SPARK-25860][SPARK-26107] [FOLLOW-UP] Rule Repla...

2018-11-26 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/23139#discussion_r236466962
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
 ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{And, ArrayExists, 
ArrayFilter, CaseWhen, Expression, If}
+import org.apache.spark.sql.catalyst.expressions.{LambdaFunction, Literal, 
MapFilter, Or}
+import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.types.BooleanType
+import org.apache.spark.util.Utils
+
+
+/**
+ * A rule that replaces `Literal(null, BooleanType)` with `FalseLiteral`, 
if possible, in the search
+ * condition of the WHERE/HAVING/ON(JOIN) clauses, which contain an 
implicit Boolean operator
--- End diff --

I think the scope of this rule is a bit bigger. For example, higher-order 
functions, all `If` and `CaseWhen` expressions. Would it make sense to replace 
`"in the search condition of the WHERE/HAVING/ON(JOIN) clauses"` with `"in 
predicates"`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23139: [SPARK-25860][SPARK-26107] [FOLLOW-UP] Rule Repla...

2018-11-26 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/23139#discussion_r236467423
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
 ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{And, ArrayExists, 
ArrayFilter, CaseWhen, Expression, If}
+import org.apache.spark.sql.catalyst.expressions.{LambdaFunction, Literal, 
MapFilter, Or}
+import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.types.BooleanType
+import org.apache.spark.util.Utils
+
+
+/**
+ * A rule that replaces `Literal(null, BooleanType)` with `FalseLiteral`, 
if possible, in the search
+ * condition of the WHERE/HAVING/ON(JOIN) clauses, which contain an 
implicit Boolean operator
+ * "(search condition) = TRUE". The replacement is only valid when 
`Literal(null, BooleanType)` is
--- End diff --

I am not sure I understand `"which contain an implicit Boolean operator 
"(search condition) = TRUE""`. Could you, please, elaborate a bit?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23079: [SPARK-26107][SQL] Extend ReplaceNullWithFalseInPredicat...

2018-11-19 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/23079
  
LGTM as well. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23079: [SPARK-26107][SQL] Extend ReplaceNullWithFalseInP...

2018-11-18 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/23079#discussion_r234467085
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicateSuite.scala
 ---
@@ -298,6 +299,45 @@ class ReplaceNullWithFalseSuite extends PlanTest {
 testProjection(originalExpr = column, expectedExpr = column)
   }
 
+  test("replace nulls in lambda function of ArrayFilter") {
+val cond = GreaterThan(UnresolvedAttribute("e"), Literal(0))
--- End diff --

Test cases for `ArrayFilter` and `ArrayExists` seem to be identical. As we 
have those tests anyway, would it make sense to cover different lambda 
functions?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23079: [SPARK-26107][SQL] Extend ReplaceNullWithFalseInPredicat...

2018-11-18 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/23079
  
@rednaxelafx I am glad the rule gets more adoption. Renaming also makes 
sense to me.

Shall we extend `ReplaceNullWithFalseEndToEndSuite` as well?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22966: [SPARK-25965][SQL][TEST] Add avro read benchmark

2018-11-08 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/22966
  
I also think having a performance trend would be useful. I'll be glad to 
help with this effort.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...

2018-10-31 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22857#discussion_r229705741
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
---
@@ -2585,4 +2585,45 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
 
 checkAnswer(swappedDf.filter($"key"($"map") > "a"), Row(2, Map(2 -> 
"b")))
   }
+
+  test("SPARK-25860: Replace Literal(null, _) with FalseLiteral whenever 
possible") {
+
+def checkPlanIsEmptyLocalScan(df: DataFrame): Unit = 
df.queryExecution.executedPlan match {
+  case s: LocalTableScanExec => assert(s.rows.isEmpty)
+  case p => fail(s"$p is not LocalTableScanExec")
+}
+
+val df1 = Seq((1, true), (2, false)).toDF("l", "b")
+val df2 = Seq(2, 3).toDF("l")
+
+val q1 = df1.where("IF(l > 10, false, b AND null)")
+checkAnswer(q1, Seq.empty)
+checkPlanIsEmptyLocalScan(q1)
+
+val q2 = df1.where("CASE WHEN l < 10 THEN null WHEN l > 40 THEN false 
ELSE null END")
+checkAnswer(q2, Seq.empty)
+checkPlanIsEmptyLocalScan(q2)
+
+val q3 = df1.join(df2, when(df1("l") > df2("l"), 
lit(null)).otherwise(df1("b") && lit(null)))
+checkAnswer(q3, Seq.empty)
+checkPlanIsEmptyLocalScan(q3)
+
+val q4 = df1.where("IF(IF(b, null, false), true, null)")
+checkAnswer(q4, Seq.empty)
+checkPlanIsEmptyLocalScan(q4)
+
+val q5 = df1.selectExpr("IF(l > 1 AND null, 5, 1) AS out")
+checkAnswer(q5, Row(1) :: Row(1) :: Nil)
+q5.queryExecution.executedPlan.foreach { p =>
+  assert(p.expressions.forall(e => e.find(_.isInstanceOf[If]).isEmpty))
--- End diff --

You are right, this can pass if `ConvertToLocalRelation` is enabled. When I 
tested this check, I did not take into account that `SharedSparkSession` 
disables `ConvertToLocalRelation`. So, the check worked correctly but only 
because `ConvertToLocalRelation` was disabled in `SharedSparkSession`. Let’s 
switch to tables. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...

2018-10-30 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22857#discussion_r229449496
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 ---
@@ -736,3 +736,60 @@ object CombineConcats extends Rule[LogicalPlan] {
   flattenConcats(concat)
   }
 }
+
+/**
+ * A rule that replaces `Literal(null, _)` with `FalseLiteral` for further 
optimizations.
+ *
+ * This rule applies to conditions in [[Filter]] and [[Join]]. Moreover, 
it transforms predicates
+ * in all [[If]] expressions as well as branch conditions in all 
[[CaseWhen]] expressions.
+ *
+ * For example, `Filter(Literal(null, _))` is equal to 
`Filter(FalseLiteral)`.
+ *
+ * Another example containing branches is `Filter(If(cond, FalseLiteral, 
Literal(null, _)))`;
+ * this can be optimized to `Filter(If(cond, FalseLiteral, 
FalseLiteral))`, and eventually
+ * `Filter(FalseLiteral)`.
+ *
+ * As this rule is not limited to conditions in [[Filter]] and [[Join]], 
arbitrary plans can
+ * benefit from it. For example, `Project(If(And(cond, Literal(null)), 
Literal(1), Literal(2)))`
+ * can be simplified into `Project(Literal(2))`.
+ *
+ * As a result, many unnecessary computations can be removed in the query 
optimization phase.
+ */
+object ReplaceNullWithFalse extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case f @ Filter(cond, _) => f.copy(condition = 
replaceNullWithFalse(cond))
+case j @ Join(_, _, _, Some(cond)) => j.copy(condition = 
Some(replaceNullWithFalse(cond)))
+case p: LogicalPlan => p transformExpressions {
+  case i @ If(pred, _, _) => i.copy(predicate = 
replaceNullWithFalse(pred))
+  case cw @ CaseWhen(branches, _) =>
+val newBranches = branches.map { case (cond, value) =>
+  replaceNullWithFalse(cond) -> value
+}
+cw.copy(branches = newBranches)
+}
+  }
+
+  /**
+   * Recursively replaces `Literal(null, _)` with `FalseLiteral`.
+   *
+   * Note that `transformExpressionsDown` can not be used here as we must 
stop as soon as we hit
+   * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or 
`Literal(null, _)`.
+   */
+  private def replaceNullWithFalse(e: Expression): Expression = e match {
+case cw: CaseWhen if cw.dataType == BooleanType =>
+  val newBranches = cw.branches.map { case (cond, value) =>
+replaceNullWithFalse(cond) -> replaceNullWithFalse(value)
+  }
+  val newElseValue = cw.elseValue.map(replaceNullWithFalse)
+  CaseWhen(newBranches, newElseValue)
+case i @ If(pred, trueVal, falseVal) if i.dataType == BooleanType =>
+  If(replaceNullWithFalse(pred), replaceNullWithFalse(trueVal), 
replaceNullWithFalse(falseVal))
--- End diff --

Let me know if I got you correctly here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...

2018-10-30 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22857#discussion_r229449194
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
---
@@ -2578,4 +2578,45 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
 Row ("abc", 1))
 }
   }
+
+  test("SPARK-25860: Replace Literal(null, _) with FalseLiteral whenever 
possible") {
+
+def checkPlanIsEmptyLocalScan(df: DataFrame): Unit = 
df.queryExecution.executedPlan match {
--- End diff --

I see, thanks.

So, you mean to use `withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> 
"") {...}` to ensure that `ConvertToLocalRelation` is not excluded?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...

2018-10-30 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22857#discussion_r229445682
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 ---
@@ -736,3 +736,60 @@ object CombineConcats extends Rule[LogicalPlan] {
   flattenConcats(concat)
   }
 }
+
+/**
+ * A rule that replaces `Literal(null, _)` with `FalseLiteral` for further 
optimizations.
+ *
+ * This rule applies to conditions in [[Filter]] and [[Join]]. Moreover, 
it transforms predicates
+ * in all [[If]] expressions as well as branch conditions in all 
[[CaseWhen]] expressions.
+ *
+ * For example, `Filter(Literal(null, _))` is equal to 
`Filter(FalseLiteral)`.
+ *
+ * Another example containing branches is `Filter(If(cond, FalseLiteral, 
Literal(null, _)))`;
+ * this can be optimized to `Filter(If(cond, FalseLiteral, 
FalseLiteral))`, and eventually
+ * `Filter(FalseLiteral)`.
+ *
+ * As this rule is not limited to conditions in [[Filter]] and [[Join]], 
arbitrary plans can
+ * benefit from it. For example, `Project(If(And(cond, Literal(null)), 
Literal(1), Literal(2)))`
+ * can be simplified into `Project(Literal(2))`.
+ *
+ * As a result, many unnecessary computations can be removed in the query 
optimization phase.
+ */
+object ReplaceNullWithFalse extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case f @ Filter(cond, _) => f.copy(condition = 
replaceNullWithFalse(cond))
+case j @ Join(_, _, _, Some(cond)) => j.copy(condition = 
Some(replaceNullWithFalse(cond)))
+case p: LogicalPlan => p transformExpressions {
+  case i @ If(pred, _, _) => i.copy(predicate = 
replaceNullWithFalse(pred))
+  case cw @ CaseWhen(branches, _) =>
+val newBranches = branches.map { case (cond, value) =>
+  replaceNullWithFalse(cond) -> value
+}
+cw.copy(branches = newBranches)
+}
+  }
+
+  /**
+   * Recursively replaces `Literal(null, _)` with `FalseLiteral`.
+   *
+   * Note that `transformExpressionsDown` can not be used here as we must 
stop as soon as we hit
+   * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or 
`Literal(null, _)`.
+   */
+  private def replaceNullWithFalse(e: Expression): Expression = e match {
+case cw: CaseWhen if cw.dataType == BooleanType =>
--- End diff --

This case is also covered and tested in `"replace null in conditions of 
CaseWhen"`, `"replace null in conditions of CaseWhen inside another CaseWhen"`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...

2018-10-30 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22857#discussion_r229445313
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 ---
@@ -736,3 +736,60 @@ object CombineConcats extends Rule[LogicalPlan] {
   flattenConcats(concat)
   }
 }
+
+/**
+ * A rule that replaces `Literal(null, _)` with `FalseLiteral` for further 
optimizations.
+ *
+ * This rule applies to conditions in [[Filter]] and [[Join]]. Moreover, 
it transforms predicates
+ * in all [[If]] expressions as well as branch conditions in all 
[[CaseWhen]] expressions.
+ *
+ * For example, `Filter(Literal(null, _))` is equal to 
`Filter(FalseLiteral)`.
+ *
+ * Another example containing branches is `Filter(If(cond, FalseLiteral, 
Literal(null, _)))`;
+ * this can be optimized to `Filter(If(cond, FalseLiteral, 
FalseLiteral))`, and eventually
+ * `Filter(FalseLiteral)`.
+ *
+ * As this rule is not limited to conditions in [[Filter]] and [[Join]], 
arbitrary plans can
+ * benefit from it. For example, `Project(If(And(cond, Literal(null)), 
Literal(1), Literal(2)))`
+ * can be simplified into `Project(Literal(2))`.
+ *
+ * As a result, many unnecessary computations can be removed in the query 
optimization phase.
+ */
+object ReplaceNullWithFalse extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case f @ Filter(cond, _) => f.copy(condition = 
replaceNullWithFalse(cond))
+case j @ Join(_, _, _, Some(cond)) => j.copy(condition = 
Some(replaceNullWithFalse(cond)))
+case p: LogicalPlan => p transformExpressions {
+  case i @ If(pred, _, _) => i.copy(predicate = 
replaceNullWithFalse(pred))
+  case cw @ CaseWhen(branches, _) =>
+val newBranches = branches.map { case (cond, value) =>
+  replaceNullWithFalse(cond) -> value
+}
+cw.copy(branches = newBranches)
+}
+  }
+
+  /**
+   * Recursively replaces `Literal(null, _)` with `FalseLiteral`.
+   *
+   * Note that `transformExpressionsDown` can not be used here as we must 
stop as soon as we hit
+   * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or 
`Literal(null, _)`.
+   */
+  private def replaceNullWithFalse(e: Expression): Expression = e match {
+case cw: CaseWhen if cw.dataType == BooleanType =>
+  val newBranches = cw.branches.map { case (cond, value) =>
+replaceNullWithFalse(cond) -> replaceNullWithFalse(value)
+  }
+  val newElseValue = cw.elseValue.map(replaceNullWithFalse)
+  CaseWhen(newBranches, newElseValue)
+case i @ If(pred, trueVal, falseVal) if i.dataType == BooleanType =>
+  If(replaceNullWithFalse(pred), replaceNullWithFalse(trueVal), 
replaceNullWithFalse(falseVal))
--- End diff --

This case is handled in `apply` and tested in `"replace null in predicates 
of If"`, `"replace null in predicates of If inside another If"`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...

2018-10-30 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22857#discussion_r229442843
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 ---
@@ -736,3 +736,60 @@ object CombineConcats extends Rule[LogicalPlan] {
   flattenConcats(concat)
   }
 }
+
+/**
+ * A rule that replaces `Literal(null, _)` with `FalseLiteral` for further 
optimizations.
+ *
+ * This rule applies to conditions in [[Filter]] and [[Join]]. Moreover, 
it transforms predicates
+ * in all [[If]] expressions as well as branch conditions in all 
[[CaseWhen]] expressions.
+ *
+ * For example, `Filter(Literal(null, _))` is equal to 
`Filter(FalseLiteral)`.
+ *
+ * Another example containing branches is `Filter(If(cond, FalseLiteral, 
Literal(null, _)))`;
+ * this can be optimized to `Filter(If(cond, FalseLiteral, 
FalseLiteral))`, and eventually
+ * `Filter(FalseLiteral)`.
+ *
+ * As this rule is not limited to conditions in [[Filter]] and [[Join]], 
arbitrary plans can
+ * benefit from it. For example, `Project(If(And(cond, Literal(null)), 
Literal(1), Literal(2)))`
+ * can be simplified into `Project(Literal(2))`.
+ *
+ * As a result, many unnecessary computations can be removed in the query 
optimization phase.
+ */
+object ReplaceNullWithFalse extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case f @ Filter(cond, _) => f.copy(condition = 
replaceNullWithFalse(cond))
+case j @ Join(_, _, _, Some(cond)) => j.copy(condition = 
Some(replaceNullWithFalse(cond)))
+case p: LogicalPlan => p transformExpressions {
+  case i @ If(pred, _, _) => i.copy(predicate = 
replaceNullWithFalse(pred))
+  case cw @ CaseWhen(branches, _) =>
+val newBranches = branches.map { case (cond, value) =>
+  replaceNullWithFalse(cond) -> value
+}
+cw.copy(branches = newBranches)
+}
+  }
+
+  /**
+   * Recursively replaces `Literal(null, _)` with `FalseLiteral`.
+   *
+   * Note that `transformExpressionsDown` can not be used here as we must 
stop as soon as we hit
+   * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or 
`Literal(null, _)`.
--- End diff --

I like your snippet because it is clean. We also considered a similar 
approach. 

1. Unfortunately, it does not handle nested `If`/`CaseWhen` expressions as 
they are not `NullIntolerant`. For example, cases like `If(If(a > 1, 
FalseLiteral, Literal(null, _)), 1, 2)` will not be optimized if we remove 
branches for `If`/`CaseWhen`.
2. If we just add one more brach to handle all `NullIntolerant` 
expressions, I am not sure it will give a lot of benefits as those expressions 
are transformed into `Literal(null, _)` by `NullPropagation` and we operate in 
the same batch.
3. As @gatorsmile said, we should be really careful with generalization. 
For example, `Not` is `NullIntolerant`. `Not(null)` is transformed into `null` 
by `NullPropagation`. We need to ensure that we do not replace `null` inside 
`Not` and do not convert `Not(null)` into `Not(FalseLiteral)`.

Therefore, the intention was to keep things simple to be safe.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...

2018-10-29 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22857#discussion_r229133793
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
---
@@ -2578,4 +2578,45 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
 Row ("abc", 1))
 }
   }
+
+  test("SPARK-25860: Replace Literal(null, _) with FalseLiteral whenever 
possible") {
+
+def checkPlanIsEmptyLocalScan(df: DataFrame): Unit = 
df.queryExecution.executedPlan match {
--- End diff --

Do we actually have a way to enable/disable `ConvertToLocalRelation`? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...

2018-10-29 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22857#discussion_r229133550
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 ---
@@ -736,3 +736,65 @@ object CombineConcats extends Rule[LogicalPlan] {
   flattenConcats(concat)
   }
 }
+
+/**
+ * A rule that replaces `Literal(null, _)` with `FalseLiteral` for further 
optimizations.
+ *
+ * For example, `Filter(Literal(null, _))` is equal to 
`Filter(FalseLiteral)`.
+ *
+ * Another example containing branches is `Filter(If(cond, FalseLiteral, 
Literal(null, _)))`;
+ * this can be optimized to `Filter(If(cond, FalseLiteral, 
FalseLiteral))`, and eventually
+ * `Filter(FalseLiteral)`.
+ *
+ * As a result, many unnecessary computations can be removed in the query 
optimization phase.
+ *
+ * Similarly, the same logic can be applied to conditions in [[Join]], 
predicates in [[If]],
+ * conditions in [[CaseWhen]].
+ */
+object ReplaceNullWithFalse extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case f @ Filter(cond, _) => f.copy(condition = 
replaceNullWithFalse(cond))
+case j @ Join(_, _, _, Some(cond)) => j.copy(condition = 
Some(replaceNullWithFalse(cond)))
+case p: LogicalPlan => p transformExpressions {
+  case i @ If(pred, _, _) => i.copy(predicate = 
replaceNullWithFalse(pred))
+  case CaseWhen(branches, elseValue) =>
+val newBranches = branches.map { case (cond, value) =>
+  replaceNullWithFalse(cond) -> value
+}
+CaseWhen(newBranches, elseValue)
+}
+  }
+
+  /**
+   * Recursively replaces `Literal(null, _)` with `FalseLiteral`.
+   *
+   * Note that `transformExpressionsDown` can not be used here as we must 
stop as soon as we hit
+   * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or 
`Literal(null, _)`.
+   */
+  private def replaceNullWithFalse(e: Expression): Expression = e match {
+case cw: CaseWhen if getValues(cw).forall(isNullOrBoolean) =>
+  val newBranches = cw.branches.map { case (cond, value) =>
+replaceNullWithFalse(cond) -> replaceNullWithFalse(value)
+  }
+  val newElseValue = cw.elseValue.map(replaceNullWithFalse)
+  CaseWhen(newBranches, newElseValue)
+case If(pred, trueVal, falseVal) if Seq(trueVal, 
falseVal).forall(isNullOrBoolean) =>
+  If(replaceNullWithFalse(pred), replaceNullWithFalse(trueVal), 
replaceNullWithFalse(falseVal))
+case And(left, right) =>
--- End diff --

Could you elaborate a bit more on `null && false`?

I had in mind `AND(true, null)` and `OR(false, null)`, which are tricky. 
For example, if we use `AND(true, null)` in SELECT, we will get `null`. 
However, if we use it inside `Filter` or predicate of `If`, it will be 
semantically equivalent to `false` (e.g., `If$eval`). Therefore, the proposed 
rule has a limited scope. I explored the source code & comments in `And/Or` to 
come up with an edge case that wouldn’t work. I could not find such a case. 
To me, it seems safe because the rule is applied only to expressions that 
evaluate to `false` if the underlying expression is `null` (i.e., conditions in 
`Filter`/`Join`, predicates in `If`, conditions in `CaseWhen`). 

Please, let me know if you have a particular case to test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...

2018-10-28 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22857#discussion_r228741884
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 ---
@@ -736,3 +736,65 @@ object CombineConcats extends Rule[LogicalPlan] {
   flattenConcats(concat)
   }
 }
+
+/**
+ * A rule that replaces `Literal(null, _)` with `FalseLiteral` for further 
optimizations.
+ *
+ * For example, `Filter(Literal(null, _))` is equal to 
`Filter(FalseLiteral)`.
+ *
+ * Another example containing branches is `Filter(If(cond, FalseLiteral, 
Literal(null, _)))`;
+ * this can be optimized to `Filter(If(cond, FalseLiteral, 
FalseLiteral))`, and eventually
+ * `Filter(FalseLiteral)`.
+ *
+ * As a result, many unnecessary computations can be removed in the query 
optimization phase.
+ *
+ * Similarly, the same logic can be applied to conditions in [[Join]], 
predicates in [[If]],
+ * conditions in [[CaseWhen]].
+ */
+object ReplaceNullWithFalse extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case f @ Filter(cond, _) => f.copy(condition = 
replaceNullWithFalse(cond))
+case j @ Join(_, _, _, Some(cond)) => j.copy(condition = 
Some(replaceNullWithFalse(cond)))
+case p: LogicalPlan => p transformExpressions {
+  case i @ If(pred, _, _) => i.copy(predicate = 
replaceNullWithFalse(pred))
+  case CaseWhen(branches, elseValue) =>
+val newBranches = branches.map { case (cond, value) =>
+  replaceNullWithFalse(cond) -> value
+}
+CaseWhen(newBranches, elseValue)
+}
+  }
+
+  /**
+   * Recursively replaces `Literal(null, _)` with `FalseLiteral`.
+   *
+   * Note that `transformExpressionsDown` can not be used here as we must 
stop as soon as we hit
+   * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or 
`Literal(null, _)`.
+   */
+  private def replaceNullWithFalse(e: Expression): Expression = e match {
+case cw: CaseWhen if getValues(cw).forall(isNullOrBoolean) =>
+  val newBranches = cw.branches.map { case (cond, value) =>
+replaceNullWithFalse(cond) -> replaceNullWithFalse(value)
+  }
+  val newElseValue = cw.elseValue.map(replaceNullWithFalse)
+  CaseWhen(newBranches, newElseValue)
+case If(pred, trueVal, falseVal) if Seq(trueVal, 
falseVal).forall(isNullOrBoolean) =>
--- End diff --

Yep, I shortened this to stay in one line below. I can either rename `pred` 
to `p`or split line 783 into multiple.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...

2018-10-28 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22857#discussion_r228741800
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 ---
@@ -736,3 +736,65 @@ object CombineConcats extends Rule[LogicalPlan] {
   flattenConcats(concat)
   }
 }
+
+/**
+ * A rule that replaces `Literal(null, _)` with `FalseLiteral` for further 
optimizations.
+ *
+ * For example, `Filter(Literal(null, _))` is equal to 
`Filter(FalseLiteral)`.
+ *
+ * Another example containing branches is `Filter(If(cond, FalseLiteral, 
Literal(null, _)))`;
+ * this can be optimized to `Filter(If(cond, FalseLiteral, 
FalseLiteral))`, and eventually
+ * `Filter(FalseLiteral)`.
+ *
+ * As a result, many unnecessary computations can be removed in the query 
optimization phase.
+ *
+ * Similarly, the same logic can be applied to conditions in [[Join]], 
predicates in [[If]],
+ * conditions in [[CaseWhen]].
+ */
+object ReplaceNullWithFalse extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case f @ Filter(cond, _) => f.copy(condition = 
replaceNullWithFalse(cond))
+case j @ Join(_, _, _, Some(cond)) => j.copy(condition = 
Some(replaceNullWithFalse(cond)))
+case p: LogicalPlan => p transformExpressions {
+  case i @ If(pred, _, _) => i.copy(predicate = 
replaceNullWithFalse(pred))
+  case CaseWhen(branches, elseValue) =>
+val newBranches = branches.map { case (cond, value) =>
+  replaceNullWithFalse(cond) -> value
+}
+CaseWhen(newBranches, elseValue)
+}
+  }
+
+  /**
+   * Recursively replaces `Literal(null, _)` with `FalseLiteral`.
+   *
+   * Note that `transformExpressionsDown` can not be used here as we must 
stop as soon as we hit
+   * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or 
`Literal(null, _)`.
+   */
+  private def replaceNullWithFalse(e: Expression): Expression = e match {
--- End diff --

Also, that's the reason why we don't use `transformExpressionsDown`. We 
will stop the replacement as soon as we hit an expression that is not 
`CaseWhen`, `If`, `And`, `Or` or `Literal(null, _)`.  Therefore, 
`If(IsNull(Literal(null, _)))` won't be transformed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22857: [SPARK-25860][SQL] Replace Literal(null, _) with FalseLi...

2018-10-27 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/22857
  
@dbtsai @gatorsmile @cloud-fan could you guys, please, take a look?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...

2018-10-27 Thread aokolnychyi
GitHub user aokolnychyi opened a pull request:

https://github.com/apache/spark/pull/22857

[SPARK-25860][SQL] Replace Literal(null, _) with FalseLiteral whenever 
possible

## What changes were proposed in this pull request?

This PR proposes a new optimization rule that replaces `Literal(null, _)` 
with `FalseLiteral` in conditions in `Join` and `Filter`, predicates in `If`, 
conditions in `CaseWhen`.

The idea is that some expressions evaluate to `false` if the underlying 
expression is `null` (as an example see `GeneratePredicate$create` or 
`doGenCode` and `eval` methods in `If` and `CaseWhen`). Therefore, we can 
replace `Literal(null, _)` with `FalseLiteral`, which can lead to more 
optimizations later on.

Let’s consider a few examples.

```
val df = spark.range(1, 100).select($"id".as("l"), ($"id" > 50).as("b"))
df.createOrReplaceTempView("t")
df.createOrReplaceTempView("p")
```

**Case 1**
```
spark.sql("SELECT * FROM t WHERE if(l > 10, false, NULL)").explain(true)

// without the new rule
…
== Optimized Logical Plan ==
Project [id#0L AS l#2L, cast(id#0L as string) AS s#3]
+- Filter if ((id#0L > 10)) false else null
   +- Range (1, 100, step=1, splits=Some(12))

== Physical Plan ==
*(1) Project [id#0L AS l#2L, cast(id#0L as string) AS s#3]
+- *(1) Filter if ((id#0L > 10)) false else null
   +- *(1) Range (1, 100, step=1, splits=12)

// with the new rule
…
== Optimized Logical Plan ==
LocalRelation , [l#2L, s#3]

== Physical Plan ==
LocalTableScan , [l#2L, s#3]
```

**Case 2**
```
spark.sql("SELECT * FROM t WHERE CASE WHEN l < 10 THEN null WHEN l > 40 
THEN false ELSE null END”).explain(true)

// without the new rule
...
== Optimized Logical Plan ==
Project [id#0L AS l#2L, cast(id#0L as string) AS s#3]
+- Filter CASE WHEN (id#0L < 10) THEN null WHEN (id#0L > 40) THEN false 
ELSE null END
   +- Range (1, 100, step=1, splits=Some(12))

== Physical Plan ==
*(1) Project [id#0L AS l#2L, cast(id#0L as string) AS s#3]
+- *(1) Filter CASE WHEN (id#0L < 10) THEN null WHEN (id#0L > 40) THEN 
false ELSE null END
   +- *(1) Range (1, 100, step=1, splits=12)

// with the new rule
...
== Optimized Logical Plan ==
LocalRelation , [l#2L, s#3]

== Physical Plan ==
LocalTableScan , [l#2L, s#3]
```

**Case 3**
```
spark.sql("SELECT * FROM t JOIN p ON IF(t.l > p.l, null, 
false)").explain(true)

// without the new rule
...
== Optimized Logical Plan ==
Join Inner, if ((l#2L > l#37L)) null else false
:- Project [id#0L AS l#2L, cast(id#0L as string) AS s#3]
:  +- Range (1, 100, step=1, splits=Some(12))
+- Project [id#0L AS l#37L, cast(id#0L as string) AS s#38]
   +- Range (1, 100, step=1, splits=Some(12))

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, Inner, if ((l#2L > l#37L)) null else 
false
:- *(1) Project [id#0L AS l#2L, cast(id#0L as string) AS s#3]
:  +- *(1) Range (1, 100, step=1, splits=12)
+- BroadcastExchange IdentityBroadcastMode
   +- *(2) Project [id#0L AS l#37L, cast(id#0L as string) AS s#38]
  +- *(2) Range (1, 100, step=1, splits=12)


// with the new rule
...
== Optimized Logical Plan ==
LocalRelation , [l#2L, s#3, l#37L, s#38]
```

## How was this patch tested?

This PR comes with a set of dedicated tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aokolnychyi/spark spark-25860

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22857.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22857


commit 1d8fefd9b227c6aba50b7e012726ec292c75b5a1
Author: Anton Okolnychyi 
Date:   2018-10-23T09:09:23Z

[SPARK-25860][SQL] Replace Literal(null, _) with FalseLiteral whenever 
possible




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when window f...

2018-09-13 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/19193
  
Hi @dongjoon-hyun. Yep, I'll close this one.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when w...

2018-09-13 Thread aokolnychyi
Github user aokolnychyi closed the pull request at:

https://github.com/apache/spark/pull/19193


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21580: [SPARK-24575][SQL] Prohibit window expressions in...

2018-06-17 Thread aokolnychyi
GitHub user aokolnychyi opened a pull request:

https://github.com/apache/spark/pull/21580

[SPARK-24575][SQL] Prohibit window expressions inside WHERE and HAVING 
clauses

## What changes were proposed in this pull request?

As discussed 
[before](https://github.com/apache/spark/pull/19193#issuecomment-393726964), 
this PR prohibits window expressions inside WHERE and HAVING clauses.

## How was this patch tested?

This PR comes with a dedicated unit test.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aokolnychyi/spark spark-24575

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21580.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21580


commit 9a07ea361eccfefe348db8a9b50acf3c68ec7a56
Author: aokolnychyi 
Date:   2018-06-17T16:56:40Z

[SPARK-24575][SQL] Prohibit window expressions inside WHERE and HAVING 
clauses




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when window f...

2018-05-31 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/19193
  
@cloud-fan @hvanhovell I created PR #21473 that fixes StackOverflow.

Apart from that, I think we might have other potential problems.

**1. Window functions inside WHERE and HAVING**

Why such cases should be prohibited is described 
[here](https://stackoverflow.com/questions/13997177/why-no-windowed-functions-in-where-clauses).
Spark, on the other hand, does not handle this explicitly and will fail 
with non-descriptive exceptions.

```
val df = Seq((1, 2), (1, 3), (2, 4), (5, 5)).toDF("a", "b")
df.createTempView("t1")

spark.sql("SELECT t1.a FROM t1 WHERE RANK() OVER(ORDER BY t1.b) = 
1").explain(true)
spark.sql("SELECT t1.a FROM t1 WHERE RANK() OVER(ORDER BY t1.b) = 
1").show(false)

Exception in thread "main" java.lang.UnsupportedOperationException: Cannot 
evaluate expression: rank(input[1, int, false]) windowspecdefinition(input[1, 
int, false] ASC NULLS FIRST, specifiedwindowframe(RowFrame, 
unboundedpreceding$(), currentrow$()))
at 
org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:261)
at 
org.apache.spark.sql.catalyst.expressions.WindowExpression.doGenCode(windowExpressions.scala:278)
at 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108)
at 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
at scala.Option.getOrElse(Option.scala:121)
...
```

```
val df = Seq((1, 2), (1, 3), (2, 4), (5, 5)).toDF("a", "b")
df.createTempView("t1")

spark.sql("SELECT t1.a, MAX(t1.b) FROM t1 GROUP BY t1.a HAVING RANK() 
OVER(ORDER BY t1.a) = 1").explain(true)
spark.sql("SELECT t1.a, MAX(t1.b) FROM t1 GROUP BY t1.a HAVING RANK() 
OVER(ORDER BY t1.a) = 1").show(false)

Exception in thread "main" java.lang.UnsupportedOperationException: Cannot 
evaluate expression: rank(input[1, int, false]) windowspecdefinition(input[1, 
int, false] ASC NULLS FIRST, specifiedwindowframe(RowFrame, 
unboundedpreceding$(), currentrow$()))
at 
org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:261)
at 
org.apache.spark.sql.catalyst.expressions.WindowExpression.doGenCode(windowExpressions.scala:278)
at 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108)
at 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
at scala.Option.getOrElse(Option.scala:121)

```
Shall this be explicitly validated?

**2. HAVING clause using the Dataset API**

It seems we cannot use HAVING with aggregate functions in the Dataset API 
if you have a window function in the same query.

The following query works correctly as ``ResolveAggregateFunctions`` will 
apply once you have the complete plan. I.e., ``ResolveAggregateFunctions`` will 
apply when you call ``where``.

```
val df = Seq((1, 2), (1, 3), (2, 4), (5, 5)).toDF("a", "b")
df.groupBy('a).agg(max('b)).where(sum('b) === 5).show(false)
+---+--+
|a  |max(b)|
+---+--+
|1  |3 |
|5  |5 |
+---+--+
```

The query below, however, will fail even though it is a valid one (notice 
window functions).
```
val df = Seq((1, 2), (1, 3), (2, 4), (5, 5)).toDF("a", "b")
df.groupBy('a).agg(max('b), rank().over(window)).where(sum('b) === 
5).show(false)

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot 
resolve '`b`' given input columns: [a, max(b), RANK() OVER (ORDER BY a ASC 
NULLS FIRST unspecifiedframe$())];;
'Filter (sum('b) = 5)
+- AnalysisBarrier
  +- Project [a#5, max(b)#14, RANK() OVER (ORDER BY a ASC NULLS FIRST 
unspecifiedframe$())#15]
 +- Project [a#5, max(b)#14, RANK() OVER (ORDER BY a ASC NULLS 
FIRST unspecifiedframe$())#15, RANK() OVER (ORDER BY a ASC NULLS FIRST 
unspecifiedframe$())#15]
+- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() 
OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$())#15], [a#5 ASC NULLS FIRST]
   +- Aggregate [a#5], [a#5, max(b#6) AS max(b)#14]
  +- Project [_1#2 AS a#5, _2#3 AS b#6]
 +- LocalRelation [_1#2, _2#3]
```

It fails because ``ExtractWindowExpressions`` will apply when you call 
``agg`` and not ``where``. At that point of time, you do not have the full plan 
and ``ExtractWindowExpressions`` will no

[GitHub] spark pull request #21473: [SPARK-21896][SQL] Fix StackOverflow caused by wi...

2018-05-31 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/21473#discussion_r192234621
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1744,11 +1744,14 @@ class Analyzer(
*it into the plan tree.
*/
   object ExtractWindowExpressions extends Rule[LogicalPlan] {
-private def hasWindowFunction(projectList: Seq[NamedExpression]): 
Boolean =
-  projectList.exists(hasWindowFunction)
+private def hasWindowFunction(exprs: Seq[Expression]): Boolean =
+  exprs.exists(hasWindowFunction)
 
-private def hasWindowFunction(expr: NamedExpression): Boolean = {
+private def hasWindowFunction(expr: Expression): Boolean = {
   expr.find {
+case AggregateExpression(aggFunc, _, _, _) if 
hasWindowFunction(aggFunc.children) =>
--- End diff --

I have some doubts that this is the best place for this check. 
StackOverflow happens in ``extract``. We can also define a separate method and 
call it inside ``extract``. However, that method will share the same structure 
as ``hasWindowFunction``.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21473: [SPARK-21896][SQL] Fix StackOverflow caused by wi...

2018-05-31 Thread aokolnychyi
GitHub user aokolnychyi opened a pull request:

https://github.com/apache/spark/pull/21473

[SPARK-21896][SQL] Fix StackOverflow caused by window functions inside 
aggregate functions


## What changes were proposed in this pull request?

This PR explicitly prohibits window functions inside aggregates. Currently, 
this will cause StackOverflow during analysis. See PR #19193 for previous 
discussion.

## How was this patch tested?

This PR comes with a dedicated unit test.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aokolnychyi/spark 
fix-stackoverflow-window-funcs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21473.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21473


commit 646a96d75c12b2e3c6886bc0cc9743e7ba838c8a
Author: aokolnychyi 
Date:   2018-05-31T10:58:29Z

[SPARK-21896][SQL] Fix StackOverflow caused by window functions inside 
aggregate functions




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when window f...

2018-05-20 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/19193
  
@hvanhovell @cloud-fan I think it would be safer to be consistent with 
other databases and what Spark does for nested aggregate functions. It is 
really simple to write a subquery to work around any problems.

As it is right now, Scenario 2 mentioned above gives a wrong result and 
Scenario 4 should be double-checked if we want to support nested window 
functions.

Let me known your opinion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when window f...

2018-05-19 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/19193
  
I checked PostgreSQL(10.3), MySQL(8.0), Hive(2.1.0). 

**1. PostgreSQL**

```
postgres=# CREATE TABLE t1 (c1 integer, c2 integer);
postgres=# INSERT INTO t1 VALUES (1, 2), (1, 3), (2,4), (5,5);
postgres=# SELECT c1, c2, ROW_NUMBER() OVER() as c3 FROM t1;
 c1 | c2 | c3 
++
  1 |  2 |  1
  1 |  3 |  2
  2 |  4 |  3
  5 |  5 |  4
(4 rows)
postgres=# SELECT c1, MAX(ROW_NUMBER() OVER()) as c3 FROM t1;
ERROR:  aggregate function calls cannot contain window function calls
LINE 1: SELECT c1, MAX(ROW_NUMBER() OVER()) as c3 FROM t1;
```

**2. MySQL**

```
mysql> CREATE TABLE t1 (c1 integer, c2 integer);
mysql> INSERT INTO t1 VALUES (1, 2), (1, 3), (2,4), (5,5);
mysql> SELECT c1, c2, ROW_NUMBER() OVER() FROM t1;
+--+--+-+
| c1   | c2   | ROW_NUMBER() OVER() |
+--+--+-+
|1 |2 |   1 |
|1 |3 |   2 |
|2 |4 |   3 |
|5 |5 |   4 |
+--+--+-+
4 rows in set (0.00 sec)
mysql> SELECT c1, MAX(ROW_NUMBER() OVER()) as c3 FROM t1;
ERROR 3593 (HY000): You cannot use the window function 'row_number' in this 
context.'
```

**3. Hive**

```
hive> CREATE TABLE t1(c1 INT, c2 INT);
hive> INSERT INTO t1 VALUES (1, 2), (1, 3), (2,4), (5,5);
hive> SELECT c1, c2, ROW_NUMBER() OVER() as c3 FROM t1;
OK
5   5   1
2   4   2
1   3   3
1   2   4
hive> SELECT c1, MAX(ROW_NUMBER() OVER()) as c3 FROM t1;
FAILED: SemanticException [Error 10002]: Line 1:15 Invalid column reference 
'ROW_NUMBER': (possible column names are: c1, c2)
```

I will adapt the PR to prohibit window functions inside aggregates.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when window f...

2018-04-06 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/19193
  
Let me check other databases and come up with a summary.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when window f...

2017-12-17 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/19193
  
@hvanhovell here is a summary of tried scenarios:

```
val df = Seq((1, 2), (1, 3), (2, 4), (5, 5)).toDF("a", "b")
val window1 = Window.orderBy('a)
val window2 = Window.orderBy('a.desc)
```

**Scenario 1: An aggregate on top of a window expression (did not work 
before, looks OK now)**
```
df.groupBy().agg(max(rank().over(window1))).explain(true)
df.groupBy().agg(max(rank().over(window1))).show(false)

== Analyzed Logical Plan ==
max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$())): int
Aggregate [max(_we0#27) AS max(RANK() OVER (ORDER BY a ASC NULLS FIRST 
unspecifiedframe$()))#16]
+- Project [a#5, b#6, _we0#27, _we0#27]
   +- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
_we0#27], [a#5 ASC NULLS FIRST]
  +- Project [_1#2 AS a#5, _2#3 AS b#6]
 +- LocalRelation [_1#2, _2#3]

== Optimized Logical Plan ==
Aggregate [max(_we0#27) AS max(RANK() OVER (ORDER BY a ASC NULLS FIRST 
unspecifiedframe$()))#16]
+- Project [_we0#27, _we0#27]
   +- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
_we0#27], [a#5 ASC NULLS FIRST]
  +- LocalRelation [a#5]

+-+
|max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))|
+-+
|4|
+-+
```

**Scenario 2: An aggregate with grouping expressions on top of a window 
expression**
TODO:  Is the result wrong? What is expected?
```
df.groupBy('a).agg(max(rank().over(window1))).explain(true)
df.groupBy('a).agg(max(rank().over(window1))).show(false)

== Analyzed Logical Plan ==
a: int, max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$())): 
int
Aggregate [a#5], [a#5, max(_we0#75) AS max(RANK() OVER (ORDER BY a ASC 
NULLS FIRST unspecifiedframe$()))#63]
+- Project [a#5, b#6, _we0#75, _we0#75]
   +- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
_we0#75], [a#5 ASC NULLS FIRST]
  +- Project [_1#2 AS a#5, _2#3 AS b#6]
 +- LocalRelation [_1#2, _2#3]

== Optimized Logical Plan ==
Aggregate [a#5], [a#5, max(_we0#75) AS max(RANK() OVER (ORDER BY a ASC 
NULLS FIRST unspecifiedframe$()))#63]
+- Project [a#5, _we0#75, _we0#75]
   +- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
_we0#75], [a#5 ASC NULLS FIRST]
  +- LocalRelation [a#5]

+---+-+
|a  |max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))|
+---+-+
|1  |1|
|2  |3|
|5  |4|
+---+-+
```

**Scenario 3: A normal aggregate, an aggregate on top of a window 
expression, a window expression on top of an aggregate in one query**
This is resolved in two steps.
```
df.groupBy('a).agg(max(rank().over(window1)), sum('b), 
sum(sum('b)).over(window2)).explain(true)
df.groupBy('a).agg(max(rank().over(window1)), sum('b), 
sum(sum('b)).over(window2)).show(false)

== Analyzed Logical Plan ==
a: int, max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$())): 
int, sum(b): bigint, sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST 
unspecifiedframe$()): bigint
Project [a#5, max(RANK() OVER (ORDER BY a ASC NULLS FIRST 
unspecifiedframe$()))#116, sum(b)#117L, sum(sum(b)) OVER (ORDER BY a DESC NULLS 
LAST unspecifiedframe$())#118L]
+- Project [a#5, max(RANK() OVER (ORDER BY a ASC NULLS FIRST 
unspecifiedframe$()))#116, sum(b)#117L, _w0#137L, sum(sum(b)) OVER (ORDER BY a 
DESC NULLS LAST unspecifiedframe$())#118L, sum(sum(b)) OVER (ORDER BY a DESC 
NULLS LAST unspecifiedframe$())#118L]
   +- Window [sum(_w0#137L) windowspecdefinition(a#5 DESC NULLS LAST, 
specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS 
sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())#118L], [a#5 
DESC NULLS LAST]
  +- Aggregate [a#5], [a#5, max(_we0#133) AS max(RANK() OVER (ORDER B

[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

2017-12-14 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/18692
  
I am not sure we can infer ``a == b`` if ``a in (0, 2, 3, 4)`` and ``b in 
(0, 2, 3, 4)``. 

table 'a'
```
a1 a2
1  2
3  3
4  5
```

table 'b'
```
b1 b2
1  -1
2  -2
3  -4
```

```
SELECT * FROM a, b WHERE a1 in (1, 2) AND b1 in (1, 2)
// 1 2 1 -1
// 1 2 2 -2
```
```
SELECT * FROM a JOIN b ON a.a1 = b.b1 WHERE a1 in (1, 2) AND b1 in (1, 2)
// 1 2 1 -1
```

Do I miss anything?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

2017-12-13 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/18692
  
Yeah, correct. So, we should revert then.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

2017-12-13 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/18692
  
I took a look at ``JoinSelection``. It seems we will not get 
``BroadcastHashJoin`` or ``ShuffledHashJoin`` if we revert this rule.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

2017-12-13 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/18692
  
Sure, if you guys think it does not give any performance benefits, then 
let's revert it.

I also had similar concerns but my understanding was that having an inner 
join with some equality condition can be beneficial during the generation of a 
physical plan. In other words, Spark should be able to select a more efficient 
join implementation. I am not sure how it is right now but previously you could 
have only ``BroadcastNestedLoopJoin`` or ``CartesianProduct`` without any 
equality condition. Again, that was my assumption based on what I remember. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when w...

2017-12-12 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19193#discussion_r156495899
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1920,7 +1927,34 @@ class Analyzer(
 
   case p: LogicalPlan if !p.childrenResolved => p
 
-  // Aggregate without Having clause.
+  // Extract window expressions from aggregate functions. There might 
be an aggregate whose
+  // aggregate function contains a window expression as a child, which 
we need to extract.
+  // e.g., df.groupBy().agg(max(rank().over(window))
+  case a @ Aggregate(groupingExprs, aggregateExprs, child)
+if containsAggregateFunctionWithWindowExpression(aggregateExprs) &&
+   a.expressions.forall(_.resolved) =>
+
+val windowExprAliases = new ArrayBuffer[NamedExpression]()
+val newAggregateExprs = aggregateExprs.map { expr =>
+  expr.transform {
--- End diff --

Hmm, do you actually mean smth like this?
```
val df = Seq((1, 2), (1, 3), (2, 4), (5, 5)).toDF("a", "b")
val window1 = Window.orderBy('a)
df.groupBy('a).agg(max(sum(sum('b)).over(window1))).explain(true)
df.groupBy('a).agg(max(sum(sum('b)).over(window1))).show(false)
```

```
== Analyzed Logical Plan ==
a: int, max(sum(sum(b)) OVER (ORDER BY a ASC NULLS FIRST 
unspecifiedframe$())): bigint
Aggregate [a#5], [a#5, max(_we0#22L) AS max(sum(sum(b)) OVER (ORDER BY a 
ASC NULLS FIRST unspecifiedframe$()))#16L]
+- Project [a#5, b#6, _we0#22L, _we0#22L]
   +- Window [sum(sum(cast(b#6 as bigint))) windowspecdefinition(a#5 ASC 
NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), 
currentrow$())) AS _we0#22L], [a#5 ASC NULLS FIRST]
  +- Project [_1#2 AS a#5, _2#3 AS b#6]
 +- LocalRelation [_1#2, _2#3]

== Optimized Logical Plan ==
Aggregate [a#5], [a#5, max(_we0#22L) AS max(sum(sum(b)) OVER (ORDER BY a 
ASC NULLS FIRST unspecifiedframe$()))#16L]
+- Project [a#5, _we0#22L, _we0#22L]
   +- Window [sum(sum(cast(b#6 as bigint))) windowspecdefinition(a#5 ASC 
NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), 
currentrow$())) AS _we0#22L], [a#5 ASC NULLS FIRST]
  +- LocalRelation [a#5, b#6]
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when w...

2017-12-12 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19193#discussion_r156493072
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1920,7 +1927,34 @@ class Analyzer(
 
   case p: LogicalPlan if !p.childrenResolved => p
 
-  // Aggregate without Having clause.
+  // Extract window expressions from aggregate functions. There might 
be an aggregate whose
+  // aggregate function contains a window expression as a child, which 
we need to extract.
+  // e.g., df.groupBy().agg(max(rank().over(window))
+  case a @ Aggregate(groupingExprs, aggregateExprs, child)
+if containsAggregateFunctionWithWindowExpression(aggregateExprs) &&
+   a.expressions.forall(_.resolved) =>
+
+val windowExprAliases = new ArrayBuffer[NamedExpression]()
+val newAggregateExprs = aggregateExprs.map { expr =>
+  expr.transform {
--- End diff --

Thanks for looking into this. I am not sure I fully understood "it will 
push the regular aggregate into the underlying window". Could you, please, 
elaborate?

This is what I tried:
```
val df = Seq((1, 2), (1, 3), (2, 4), (5, 5)).toDF("a", "b")
val window1 = Window.orderBy('a)
val window2 = Window.orderBy('a.desc)

df.groupBy('a).agg(max(rank().over(window1)), sum('b), 
sum(sum('b)).over(window2)).explain(true)
df.groupBy('a).agg(max(rank().over(window1)), sum('b), 
sum(sum('b)).over(window2)).show(false)
```

It produced the following plans:

```
== Analyzed Logical Plan ==
a: int, max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$())): 
int, sum(b): bigint, sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST 
unspecifiedframe$()): bigint
Project [a#5, max(RANK() OVER (ORDER BY a ASC NULLS FIRST 
unspecifiedframe$()))#19, sum(b)#20L, sum(sum(b)) OVER (ORDER BY a DESC NULLS 
LAST unspecifiedframe$())#21L]
+- Project [a#5, max(RANK() OVER (ORDER BY a ASC NULLS FIRST 
unspecifiedframe$()))#19, sum(b)#20L, _w0#40L, sum(sum(b)) OVER (ORDER BY a 
DESC NULLS LAST unspecifiedframe$())#21L, sum(sum(b)) OVER (ORDER BY a DESC 
NULLS LAST unspecifiedframe$())#21L]
   +- Window [sum(_w0#40L) windowspecdefinition(a#5 DESC NULLS LAST, 
specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS 
sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())#21L], [a#5 
DESC NULLS LAST]
  +- Aggregate [a#5], [a#5, max(_we0#36) AS max(RANK() OVER (ORDER BY a 
ASC NULLS FIRST unspecifiedframe$()))#19, sum(cast(b#6 as bigint)) AS 
sum(b)#20L, sum(cast(b#6 as bigint)) AS _w0#40L]
 +- Project [a#5, b#6, _we0#36, _we0#36]
+- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
_we0#36], [a#5 ASC NULLS FIRST]
   +- Project [_1#2 AS a#5, _2#3 AS b#6]
  +- LocalRelation [_1#2, _2#3]

== Optimized Logical Plan ==
Project [a#5, max(RANK() OVER (ORDER BY a ASC NULLS FIRST 
unspecifiedframe$()))#19, sum(b)#20L, sum(sum(b)) OVER (ORDER BY a DESC NULLS 
LAST unspecifiedframe$())#21L]
+- Window [sum(_w0#40L) windowspecdefinition(a#5 DESC NULLS LAST, 
specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS 
sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())#21L], [a#5 
DESC NULLS LAST]
   +- Aggregate [a#5], [a#5, max(_we0#36) AS max(RANK() OVER (ORDER BY a 
ASC NULLS FIRST unspecifiedframe$()))#19, sum(cast(b#6 as bigint)) AS 
sum(b)#20L, sum(cast(b#6 as bigint)) AS _w0#40L]
  +- Project [a#5, b#6, _we0#36, _we0#36]
 +- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
_we0#36], [a#5 ASC NULLS FIRST]
+- LocalRelation [a#5, b#6]

```

The result was:

```

+---+-+--+-+
|a  |max(RANK() OVER (ORDER BY a ASC NULLS FIRST 
unspecifiedframe$()))|sum(b)|sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST 
unspecifiedframe$())|

+---+-+--+-+
|5  |4|5
 |5|
|2  |3|4
 |9|
|1  |1

[GitHub] spark issue #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when window f...

2017-12-08 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/19193
  
@gatorsmile @cloud-fan could you provide any input?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

2017-11-30 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/18692#discussion_r154164912
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala
 ---
@@ -27,6 +27,8 @@ object ExpressionSet {
 expressions.foreach(set.add)
 set
   }
+
+  val empty: ExpressionSet = ExpressionSet(Nil)
--- End diff --

I thought that writing ``ExpressionSet.empty`` would be more readable than 
``ExpressionSet(Nil)``. Usually, mutable collections have ``def empty()`` and 
immutable ones have separate objects that represent empty collections (e.g., 
``Nil``, ``Stream.Empty``). I defined ``val empty`` since ``ExpressionSet`` is 
immutable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

2017-11-28 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/18692#discussion_r153420088
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +152,99 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * A rule that eliminates CROSS joins by inferring join conditions from 
propagated constraints.
+ *
+ * The optimization is applicable only to CROSS joins. For other join 
types, adding inferred join
+ * conditions would potentially shuffle children as child node's 
partitioning won't satisfy the JOIN
+ * node's requirements which otherwise could have.
+ *
+ * For instance, if there is a CROSS join, where the left relation has 'a 
= 1' and the right
+ * relation has 'b = 1', the rule infers 'a = b' as a join predicate.
+ */
+object EliminateCrossJoin extends Rule[LogicalPlan] with PredicateHelper {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (SQLConf.get.constraintPropagationEnabled) {
+  eliminateCrossJoin(plan)
+} else {
+  plan
+}
+  }
+
+  private def eliminateCrossJoin(plan: LogicalPlan): LogicalPlan = plan 
transform {
+case join@Join(leftPlan, rightPlan, Cross, None) =>
+  val leftConstraints = 
join.constraints.filter(_.references.subsetOf(leftPlan.outputSet))
+  val rightConstraints = 
join.constraints.filter(_.references.subsetOf(rightPlan.outputSet))
+  val inferredJoinPredicates = inferJoinPredicates(leftConstraints, 
rightConstraints)
+  val joinConditionOpt = inferredJoinPredicates.reduceOption(And)
+  if (joinConditionOpt.isDefined) Join(leftPlan, rightPlan, Inner, 
joinConditionOpt) else join
+  }
+
+  private def inferJoinPredicates(
+  leftConstraints: Set[Expression],
+  rightConstraints: Set[Expression]): Set[EqualTo] = {
+
+// iterate through the left constraints and build a hash map that 
points semantically
+// equivalent expressions into attributes
+val emptyEquivalenceMap = Map.empty[SemanticExpression, Set[Attribute]]
+val equivalenceMap = leftConstraints.foldLeft(emptyEquivalenceMap) { 
case (map, constraint) =>
+  constraint match {
+case EqualTo(attr: Attribute, expr: Expression) =>
+  updateEquivalenceMap(map, attr, expr)
+case EqualTo(expr: Expression, attr: Attribute) =>
+  updateEquivalenceMap(map, attr, expr)
+case _ => map
+  }
+}
+
+// iterate through the right constraints and infer join conditions 
using the equivalence map
+rightConstraints.foldLeft(Set.empty[EqualTo]) { case (joinConditions, 
constraint) =>
+  constraint match {
+case EqualTo(attr: Attribute, expr: Expression) =>
+  appendJoinConditions(attr, expr, equivalenceMap, joinConditions)
+case EqualTo(expr: Expression, attr: Attribute) =>
+  appendJoinConditions(attr, expr, equivalenceMap, joinConditions)
+case _ => joinConditions
+  }
+}
+  }
+
+  private def updateEquivalenceMap(
+  equivalenceMap: Map[SemanticExpression, Set[Attribute]],
+  attr: Attribute,
+  expr: Expression): Map[SemanticExpression, Set[Attribute]] = {
+
+val equivalentAttrs = equivalenceMap.getOrElse(expr, 
Set.empty[Attribute])
+if (equivalentAttrs.contains(attr)) {
+  equivalenceMap
+} else {
+  equivalenceMap.updated(expr, equivalentAttrs + attr)
+}
+  }
+
+  private def appendJoinConditions(
+  attr: Attribute,
+  expr: Expression,
+  equivalenceMap: Map[SemanticExpression, Set[Attribute]],
+  joinConditions: Set[EqualTo]): Set[EqualTo] = {
+
+equivalenceMap.get(expr) match {
+  case Some(equivalentAttrs) => joinConditions ++ 
equivalentAttrs.map(EqualTo(attr, _))
+  case None => joinConditions
+}
+  }
+
+  // the purpose of this class is to treat 'a === 1 and 1 === 'a as the 
same expressions
+  implicit class SemanticExpression(private val expr: Expression) {
--- End diff --

I am afraid ``ExpressionSet`` will not help here since we need to map a 
semantically equivalent expression into a set of attributes that correspond to 
it. It is not enough to check if there is an equivalent expression. Therefore, 
``EquivalentExpressions`` and ``ExpressionSet`` are not applicable (as far as I 
see). 

``EquivalentExpressionMap`` from the previous comment assumes the following 
workflow:

```
val equivalentExression

[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

2017-11-27 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/18692#discussion_r153329031
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +152,99 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * A rule that eliminates CROSS joins by inferring join conditions from 
propagated constraints.
+ *
+ * The optimization is applicable only to CROSS joins. For other join 
types, adding inferred join
+ * conditions would potentially shuffle children as child node's 
partitioning won't satisfy the JOIN
+ * node's requirements which otherwise could have.
+ *
+ * For instance, if there is a CROSS join, where the left relation has 'a 
= 1' and the right
+ * relation has 'b = 1', the rule infers 'a = b' as a join predicate.
+ */
+object EliminateCrossJoin extends Rule[LogicalPlan] with PredicateHelper {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (SQLConf.get.constraintPropagationEnabled) {
+  eliminateCrossJoin(plan)
+} else {
+  plan
+}
+  }
+
+  private def eliminateCrossJoin(plan: LogicalPlan): LogicalPlan = plan 
transform {
+case join@Join(leftPlan, rightPlan, Cross, None) =>
+  val leftConstraints = 
join.constraints.filter(_.references.subsetOf(leftPlan.outputSet))
+  val rightConstraints = 
join.constraints.filter(_.references.subsetOf(rightPlan.outputSet))
+  val inferredJoinPredicates = inferJoinPredicates(leftConstraints, 
rightConstraints)
+  val joinConditionOpt = inferredJoinPredicates.reduceOption(And)
+  if (joinConditionOpt.isDefined) Join(leftPlan, rightPlan, Inner, 
joinConditionOpt) else join
+  }
+
+  private def inferJoinPredicates(
+  leftConstraints: Set[Expression],
+  rightConstraints: Set[Expression]): Set[EqualTo] = {
+
+// iterate through the left constraints and build a hash map that 
points semantically
+// equivalent expressions into attributes
+val emptyEquivalenceMap = Map.empty[SemanticExpression, Set[Attribute]]
+val equivalenceMap = leftConstraints.foldLeft(emptyEquivalenceMap) { 
case (map, constraint) =>
+  constraint match {
+case EqualTo(attr: Attribute, expr: Expression) =>
+  updateEquivalenceMap(map, attr, expr)
+case EqualTo(expr: Expression, attr: Attribute) =>
+  updateEquivalenceMap(map, attr, expr)
+case _ => map
+  }
+}
+
+// iterate through the right constraints and infer join conditions 
using the equivalence map
+rightConstraints.foldLeft(Set.empty[EqualTo]) { case (joinConditions, 
constraint) =>
+  constraint match {
+case EqualTo(attr: Attribute, expr: Expression) =>
+  appendJoinConditions(attr, expr, equivalenceMap, joinConditions)
+case EqualTo(expr: Expression, attr: Attribute) =>
+  appendJoinConditions(attr, expr, equivalenceMap, joinConditions)
+case _ => joinConditions
+  }
+}
+  }
+
+  private def updateEquivalenceMap(
+  equivalenceMap: Map[SemanticExpression, Set[Attribute]],
+  attr: Attribute,
+  expr: Expression): Map[SemanticExpression, Set[Attribute]] = {
+
+val equivalentAttrs = equivalenceMap.getOrElse(expr, 
Set.empty[Attribute])
+if (equivalentAttrs.contains(attr)) {
+  equivalenceMap
+} else {
+  equivalenceMap.updated(expr, equivalentAttrs + attr)
+}
+  }
+
+  private def appendJoinConditions(
+  attr: Attribute,
+  expr: Expression,
+  equivalenceMap: Map[SemanticExpression, Set[Attribute]],
+  joinConditions: Set[EqualTo]): Set[EqualTo] = {
+
+equivalenceMap.get(expr) match {
+  case Some(equivalentAttrs) => joinConditions ++ 
equivalentAttrs.map(EqualTo(attr, _))
+  case None => joinConditions
+}
+  }
+
+  // the purpose of this class is to treat 'a === 1 and 1 === 'a as the 
same expressions
+  implicit class SemanticExpression(private val expr: Expression) {
--- End diff --

Using a Set instead of a List might be beneficial in the proposed rule. 
What about the following?

```
class EquivalentExpressionMap {

  private val equivalenceMap = mutable.HashMap.empty[SemanticallyEqualExpr, 
mutable.Set[Expression]]

  def put(expression: Expression, equivalentExpression: Expression): Unit = 
{
val equivalentExpressions = equivalenceMap.getOrElse(expression, 
mutable.Set.empty)
if (!equivalentExpressi

[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

2017-11-26 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/18692#discussion_r153066992
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +152,99 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * A rule that eliminates CROSS joins by inferring join conditions from 
propagated constraints.
+ *
+ * The optimization is applicable only to CROSS joins. For other join 
types, adding inferred join
+ * conditions would potentially shuffle children as child node's 
partitioning won't satisfy the JOIN
+ * node's requirements which otherwise could have.
+ *
+ * For instance, if there is a CROSS join, where the left relation has 'a 
= 1' and the right
+ * relation has 'b = 1', the rule infers 'a = b' as a join predicate.
+ */
+object EliminateCrossJoin extends Rule[LogicalPlan] with PredicateHelper {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (SQLConf.get.constraintPropagationEnabled) {
+  eliminateCrossJoin(plan)
+} else {
+  plan
+}
+  }
+
+  private def eliminateCrossJoin(plan: LogicalPlan): LogicalPlan = plan 
transform {
+case join@Join(leftPlan, rightPlan, Cross, None) =>
+  val leftConstraints = 
join.constraints.filter(_.references.subsetOf(leftPlan.outputSet))
+  val rightConstraints = 
join.constraints.filter(_.references.subsetOf(rightPlan.outputSet))
+  val inferredJoinPredicates = inferJoinPredicates(leftConstraints, 
rightConstraints)
+  val joinConditionOpt = inferredJoinPredicates.reduceOption(And)
+  if (joinConditionOpt.isDefined) Join(leftPlan, rightPlan, Inner, 
joinConditionOpt) else join
+  }
+
+  private def inferJoinPredicates(
+  leftConstraints: Set[Expression],
+  rightConstraints: Set[Expression]): Set[EqualTo] = {
+
+// iterate through the left constraints and build a hash map that 
points semantically
+// equivalent expressions into attributes
+val emptyEquivalenceMap = Map.empty[SemanticExpression, Set[Attribute]]
+val equivalenceMap = leftConstraints.foldLeft(emptyEquivalenceMap) { 
case (map, constraint) =>
+  constraint match {
+case EqualTo(attr: Attribute, expr: Expression) =>
+  updateEquivalenceMap(map, attr, expr)
+case EqualTo(expr: Expression, attr: Attribute) =>
+  updateEquivalenceMap(map, attr, expr)
+case _ => map
+  }
+}
+
+// iterate through the right constraints and infer join conditions 
using the equivalence map
+rightConstraints.foldLeft(Set.empty[EqualTo]) { case (joinConditions, 
constraint) =>
+  constraint match {
+case EqualTo(attr: Attribute, expr: Expression) =>
+  appendJoinConditions(attr, expr, equivalenceMap, joinConditions)
+case EqualTo(expr: Expression, attr: Attribute) =>
+  appendJoinConditions(attr, expr, equivalenceMap, joinConditions)
+case _ => joinConditions
+  }
+}
+  }
+
+  private def updateEquivalenceMap(
+  equivalenceMap: Map[SemanticExpression, Set[Attribute]],
+  attr: Attribute,
+  expr: Expression): Map[SemanticExpression, Set[Attribute]] = {
+
+val equivalentAttrs = equivalenceMap.getOrElse(expr, 
Set.empty[Attribute])
+if (equivalentAttrs.contains(attr)) {
+  equivalenceMap
+} else {
+  equivalenceMap.updated(expr, equivalentAttrs + attr)
+}
+  }
+
+  private def appendJoinConditions(
+  attr: Attribute,
+  expr: Expression,
+  equivalenceMap: Map[SemanticExpression, Set[Attribute]],
+  joinConditions: Set[EqualTo]): Set[EqualTo] = {
+
+equivalenceMap.get(expr) match {
+  case Some(equivalentAttrs) => joinConditions ++ 
equivalentAttrs.map(EqualTo(attr, _))
+  case None => joinConditions
+}
+  }
+
+  // the purpose of this class is to treat 'a === 1 and 1 === 'a as the 
same expressions
+  implicit class SemanticExpression(private val expr: Expression) {
--- End diff --

@gatorsmile 

I think we just need the case class inside ``EquivalentExpressions`` since 
we have to map all semantically equivalent expressions into a set of attributes 
(as opposed to mapping an expression into a set of equivalent expressions). 

I see two ways to go:

1. Expose the case class inside ``EquivalentExpressions`` with minimum 
changes in the code base (e.g., using a companion object):


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

2017-11-22 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/18692#discussion_r152660385
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +152,71 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * A rule that uses propagated constraints to infer join conditions. The 
optimization is applicable
+ * only to CROSS joins.
+ *
+ * For instance, if there is a CROSS join, where the left relation has 'a 
= 1' and the right
+ * relation has 'b = 1', then the rule infers 'a = b' as a join predicate.
+ */
+object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with 
PredicateHelper {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (SQLConf.get.constraintPropagationEnabled) {
+  inferJoinConditions(plan)
+} else {
+  plan
+}
+  }
+
+  private def inferJoinConditions(plan: LogicalPlan): LogicalPlan = plan 
transform {
+case join @ Join(left, right, Cross, conditionOpt) =>
+  val leftConstraints = 
join.constraints.filter(_.references.subsetOf(left.outputSet))
+  val rightConstraints = 
join.constraints.filter(_.references.subsetOf(right.outputSet))
+  val inferredJoinPredicates = inferJoinPredicates(leftConstraints, 
rightConstraints)
+
+  val newConditionOpt = conditionOpt match {
+case Some(condition) =>
+  val existingPredicates = splitConjunctivePredicates(condition)
+  val newPredicates = findNewPredicates(inferredJoinPredicates, 
existingPredicates)
+  if (newPredicates.nonEmpty) Some(And(newPredicates.reduce(And), 
condition)) else None
+case None =>
+  inferredJoinPredicates.reduceOption(And)
+  }
+  if (newConditionOpt.isDefined) Join(left, right, Inner, 
newConditionOpt) else join
--- End diff --

@gatorsmile Thanks for getting back.

``CheckCartesianProducts`` identifies a join of type ``Inner | LeftOuter | 
RightOuter | FullOuter`` as a cartesian product if there is no join predicate 
that has references to both relations.

If we agree to ignore joins of type Cross that have a condition (in this 
PR), then the use case in this 
[discussion](https://github.com/apache/spark/pull/18692#discussion_r144466472) 
is no longer possible (even if you remove t1.col1 >= t2.col1). Correct? 
``PushPredicateThroughJoin`` will push ``t1.col1 = t1.col2 + t2.col2 and 
t2.col1 = t1.col2 + t2.col2`` into the join condition and the proposed rule 
will not infer anything and the 
final join will be of type Cross with a condition that covers both 
relations. According to the logic of ``CheckCartesianProducts``, it is not 
considered to be a cartesian product (since there exists a join predicate that 
covers both relations, e.g. ``t1.col1 = t1.col2 + t2.col2``).

So, if I have a confirmation that we need to consider only joins of type 
Cross and without any join conditions, I can update the PR accordingly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

2017-11-21 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/18692
  
@SimonBin The initial solution handled your case but then there was a 
decision to restrict the proposed rule to cross joins only. You can find the 
reason in this 
[comment](https://github.com/apache/spark/pull/18692#issuecomment-326694822) or 
in the code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

2017-10-18 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/18692#discussion_r145498671
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +152,79 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * A rule that uses propagated constraints to infer join conditions. The 
optimization is applicable
+ * only to CROSS joins. For other join types, adding inferred join 
conditions would potentially
+ * shuffle children as child node's partitioning won't satisfy the JOIN 
node's requirements
+ * which otherwise could have.
+ *
+ * For instance, if there is a CROSS join, where the left relation has 'a 
= 1' and the right
+ * relation has 'b = 1', then the rule infers 'a = b' as a join predicate.
+ */
+object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with 
PredicateHelper {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (SQLConf.get.constraintPropagationEnabled) {
+  inferJoinConditions(plan)
+} else {
+  plan
+}
+  }
+
+  private def inferJoinConditions(plan: LogicalPlan): LogicalPlan = plan 
transform {
+case join @ Join(left, right, Cross, conditionOpt) =>
+
+  val rightEqualToPredicates = join.constraints.collect {
--- End diff --

I thought about improving the time complexity here via a hash map with 
semantic equals/hashcode. However, this idea will require a wrapper so I keep 
it as it is. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

2017-10-15 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/18692#discussion_r144722742
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +152,71 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * A rule that uses propagated constraints to infer join conditions. The 
optimization is applicable
+ * only to CROSS joins.
+ *
+ * For instance, if there is a CROSS join, where the left relation has 'a 
= 1' and the right
+ * relation has 'b = 1', then the rule infers 'a = b' as a join predicate.
+ */
+object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with 
PredicateHelper {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (SQLConf.get.constraintPropagationEnabled) {
+  inferJoinConditions(plan)
+} else {
+  plan
+}
+  }
+
+  private def inferJoinConditions(plan: LogicalPlan): LogicalPlan = plan 
transform {
+case join @ Join(left, right, Cross, conditionOpt) =>
+  val leftConstraints = 
join.constraints.filter(_.references.subsetOf(left.outputSet))
+  val rightConstraints = 
join.constraints.filter(_.references.subsetOf(right.outputSet))
--- End diff --

@gengliangwang Yeah, makes sense. So, ``PushPredicateThroughJoin`` would 
push the where clause into the join and the proposed rule will infer ``t1.col1 
= t2.col1`` and change the join type to INNER. As a result, the final join 
condition will be ``t1.col1 = t2.col1 and t1.col1 >= t2.col1 and (t1.col1 = 
t1.col2 + t2.col2 and t2.col1 = t1.col2 + t2.col2)``. Am I right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19252: [SPARK-21969][SQL] CommandUtils.updateTableStats should ...

2017-09-17 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/19252
  
@gatorsmile thanks for the feedback. I also covered 
``TruncateTableCommand`` with additional tests. However, I see a bit strange 
behavior while creating a test for ``AlterTableAddPartitionCommand ``.

```
sql(s"CREATE TABLE t1 (col1 int, col2 int) USING PARQUET")
sql(s"INSERT INTO TABLE t1 SELECT 1, 2")
sql(s"INSERT INTO TABLE t1 SELECT 2, 4")
sql("SELECT * FROM t1").show()
+++
|col1|col2|
+++
|   1|   2|
|   2|   4|
+++

sql(s"CREATE TABLE t2 (col1 int, col2 int) USING PARQUET PARTITIONED BY 
(col1)")
sql(s"INSERT INTO TABLE t2 SELECT 1, 2")
sql(s"INSERT INTO TABLE t2 SELECT 2, 4")
sql("SELECT * FROM t2").show()
+++
|col2|col1|
+++
|   2|   4|
|   1|   2|
+++
```

Why are the results different? Is it a bug?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19252: [SPARK-21969][SQL] CommandUtils.updateTableStats ...

2017-09-16 Thread aokolnychyi
GitHub user aokolnychyi opened a pull request:

https://github.com/apache/spark/pull/19252

[SPARK-21969][SQL] CommandUtils.updateTableStats should call refreshTable

## What changes were proposed in this pull request?

Tables in the catalog cache are not invalidated once their statistics are 
updated. As a consequence, existing sessions will use the cached information 
even though it is not valid anymore. Consider and an example below. 

```
// step 1
spark.range(100).write.saveAsTable("tab1")
// step 2
spark.sql("analyze table tab1 compute statistics")
// step 3
spark.sql("explain cost select distinct * from tab1").show(false)
// step 4
spark.range(100).write.mode("append").saveAsTable("tab1")
// step 5
spark.sql("explain cost select distinct * from tab1").show(false)
```

After step 3, the table will be present in the catalog relation cache. Step 
4 will correctly update the metadata inside the catalog but will NOT invalidate 
the cache.

By the way, ``spark.sql("analyze table tab1 compute statistics")`` between 
step 3 and step 4 would also solve the problem.

## How was this patch tested?

Current and additional unit tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aokolnychyi/spark spark-21969

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19252.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19252

----
commit ba963b46cd2917315bc2bd0cf237c7d9f79e9d65
Author: aokolnychyi <anton.okolnyc...@sap.com>
Date:   2017-09-16T11:57:52Z

[SPARK-21969][SQL] CommandUtils.updateTableStats should call refreshTable




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when w...

2017-09-11 Thread aokolnychyi
GitHub user aokolnychyi opened a pull request:

https://github.com/apache/spark/pull/19193

[WIP][SPARK-21896][SQL] Fix Stack Overflow when window function is nested 
inside an aggregate function

## What changes were proposed in this pull request?

This WIP PR contains a prototype that fixes a StackOverflowError in 
``Analyzer``. Shortly speaking, Spark cannot handle window expressions inside 
aggregate functions. The root cause of the bug is the inability of 
``ExtractWindowExpressions`` to extract window expressions from aggregate 
functions.

```
val df = Seq((1, 2), (1, 3), (2, 4)).toDF("a", "b")
val window = Window.orderBy("a")

df.groupBy().agg(max(rank().over(window))) // does not work
df.select(rank().over(window).alias("rank")).agg(max("rank")) // works
```
It would be nice to get some initial feedback since there are alternative 
ways for solving this problem.

## How was this patch tested?

This PR represents only an idea and was tested manually in several 
scenarios.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aokolnychyi/spark spark-21896

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19193.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19193


commit c14aa2ff6161de7d45869d91e53b0b25b18ad2dd
Author: aokolnychyi <anton.okolnyc...@sap.com>
Date:   2017-09-10T19:04:38Z

[SPARK-21896][SQL] Fix Stack Overflow when window function is nested inside 
an aggregate function




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

2017-09-06 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/18692#discussion_r137343500
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +152,71 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * A rule that uses propagated constraints to infer join conditions. The 
optimization is applicable
+ * only to CROSS joins.
+ *
+ * For instance, if there is a CROSS join, where the left relation has 'a 
= 1' and the right
+ * relation has 'b = 1', then the rule infers 'a = b' as a join predicate.
+ */
+object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with 
PredicateHelper {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (SQLConf.get.constraintPropagationEnabled) {
+  inferJoinConditions(plan)
+} else {
+  plan
+}
+  }
+
+  private def inferJoinConditions(plan: LogicalPlan): LogicalPlan = plan 
transform {
+case join @ Join(left, right, Cross, conditionOpt) =>
+  val leftConstraints = 
join.constraints.filter(_.references.subsetOf(left.outputSet))
+  val rightConstraints = 
join.constraints.filter(_.references.subsetOf(right.outputSet))
+  val inferredJoinPredicates = inferJoinPredicates(leftConstraints, 
rightConstraints)
+
+  val newConditionOpt = conditionOpt match {
+case Some(condition) =>
+  val existingPredicates = splitConjunctivePredicates(condition)
+  val newPredicates = findNewPredicates(inferredJoinPredicates, 
existingPredicates)
+  if (newPredicates.nonEmpty) Some(And(newPredicates.reduce(And), 
condition)) else None
+case None =>
+  inferredJoinPredicates.reduceOption(And)
+  }
+  if (newConditionOpt.isDefined) Join(left, right, Inner, 
newConditionOpt) else join
--- End diff --

And what about CROSS joins with join conditions? Not sure if they will 
benefit from the proposed rule, but it is better to ask.

```
Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t1")
Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t2")
val df = spark.sql("SELECT * FROM t1 CROSS JOIN t2 ON t1.col1 >= t2.col1 
WHERE t1.col1 = 1 AND t2.col1 = 1")
df.explain(true)
== Optimized Logical Plan ==
Join Cross, (col1#40 >= col1#42)
:- Filter (isnotnull(col1#40) && (col1#40 = 1))
:  +- Relation[col1#40,col2#41] parquet
+- Filter (isnotnull(col1#42) && (col1#42 = 1))
   +- Relation[col1#42,col2#43] parquet
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

2017-09-06 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/18692#discussion_r137343433
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +152,71 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * A rule that uses propagated constraints to infer join conditions. The 
optimization is applicable
+ * only to CROSS joins.
+ *
+ * For instance, if there is a CROSS join, where the left relation has 'a 
= 1' and the right
+ * relation has 'b = 1', then the rule infers 'a = b' as a join predicate.
+ */
+object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with 
PredicateHelper {
--- End diff --

I also thought about this but `InferFiltersFromConstraints` does not change 
considered join types. Therefore, I kept them separated. In addition, I thought 
about renaming it to `EliminateCrossJoin`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

2017-08-31 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/18692
  
@gatorsmile what is our decision here? Shall we wait until SPARK-21652 is 
resolved? In the meantime, I can add some tests and see how the proposed rule 
works together with all others. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18909: [MINOR][SQL] Additional test case for CheckCartesianProd...

2017-08-13 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/18909
  
@gatorsmile sure, this PR is only about tests, I was just wondering what is 
planned regarding cross joins with inequality conditions.

I borrowed several tests from PR #16762 and added additional ones. As I 
mentioned, there is a small overlap between the existing tests and proposed 
ones but they are defined at different levels.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18909: [MINOR][SQL] Additional test case for CheckCartesianProd...

2017-08-11 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/18909
  
@gatorsmile I took a look at both PRs. 

I quickly scanned  PR #14866 and did not find tests for existence joins. 
Also, `SQLConf.CROSS_JOINS_ENABLED = true` is checked only for `left_outer`. 
So, the proposed tests slightly improve the coverage. 

PR #16762 checks everything from a different prospective than the proposed 
rules and has some unique scenarios compared to PR #14866. The main question 
that PR #16762 rises is about, for instance, inner joins with inequality 
conditions. As far as I understood, the ability to detect such cartesian 
products was the motivation to move the check away from the Optimizer. Is it 
still planned? Cannot this be also done by modifying the existing rule in the 
Optimizer? Currently, it only checks that there are conditions which reference 
to both sides. Instead, it can rely on equality predicates, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18909: [MINOR][SQL] Additional test case for CheckCartes...

2017-08-10 Thread aokolnychyi
GitHub user aokolnychyi opened a pull request:

https://github.com/apache/spark/pull/18909

[MINOR][SQL] Additional test case for CheckCartesianProducts rule

## What changes were proposed in this pull request?

While discovering optimization rules and their test coverage, I did not 
find any tests for `CheckCartesianProducts` in the Catalyst folder. So, I 
decided to create a new test suite. Once I finished, I found a test in 
`JoinSuite` for this functionality so feel free to discard this change if it 
does not make much sense. The proposed test suite covers a few additional use 
cases.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aokolnychyi/spark check-cartesian-join-tests

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18909.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18909


commit 98b54cad45b638515d36966a1e64955f4f1531d1
Author: aokolnychyi <anton.okolnyc...@sap.com>
Date:   2017-08-09T21:13:02Z

[MINOR][SQL] Additional test case for CheckCartesianProducts rule




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

2017-08-08 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/18692
  
@gatorsmile I updated the rule to cover cross join cases. Regarding the 
case with the redundant condition mentioned by you, I opened 
[SPARK-21652](https://issues.apache.org/jira/browse/SPARK-21652). It is an 
existing issue and is not caused by the proposed rule. BTW, I can try to fix it 
once we agree on a solution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

2017-08-01 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/18692#discussion_r130662925
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +152,72 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * A rule that uses propagated constraints to infer join conditions. The 
optimization is applicable
+ * only to inner joins.
+ *
+ * For instance, if there is a join, where the left relation has 'a = 1' 
and the right relation
+ * has 'b = 1', then the rule infers 'a = b' as a join predicate. Only 
semantically new predicates
+ * are appended to the existing join condition.
+ */
+object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with 
PredicateHelper {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (SQLConf.get.constraintPropagationEnabled) {
+  inferJoinConditions(plan)
+} else {
+  plan
+}
+  }
+
+  private def inferJoinConditions(plan: LogicalPlan): LogicalPlan = plan 
transform {
+case join @ Join(left, right, Inner, conditionOpt) =>
--- End diff --

I also thought about this but decided to start with a smaller scope. The 
motivation was that `"SELECT * FROM t1, t2"` is resolved into an Inner Join and 
one has to explicitly use the Cross Join syntax to allow cartesian products. I 
was not sure if it was OK to replace an explicit Cross Join with a join of a 
different type. Semantically, we can have `InnerLike` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18692: [SPARK-21417][SQL] Detect joind conditions via filter ex...

2017-07-31 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/18692
  
@gatorsmile I took a look at the case above. Indeed, the proposed rule 
triggers this issue but only indirectly. In the example above, the optimizer 
will never reach a fixed point. Please, find my investigation below.

```
... 

// The new rule infers correct join predicates
Join Inner, ((col2#33 = col#34) && (col1#32 = col#34))
:- Filter ((col1#32 = col2#33) && (col1#32 = 1))
:  +- Relation[col1#32,col2#33] parquet
+- Filter (col#34 = 1)
   +- Relation[col#34] parquet

// InferFiltersFromConstraints adds more filters
Join Inner, ((col2#33 = col#34) && (col1#32 = col#34))
:- Filter col2#33 = 1) && isnotnull(col1#32)) && isnotnull(col2#33)) && 
((col1#32 = col2#33) && (col1#32 = 1)))
:  +- Relation[col1#32,col2#33] parquet
+- Filter (isnotnull(col#34) && (col#34 = 1))
   +- Relation[col#34] parquet

// ConstantPropagation is applied
Join Inner, ((col2#33 = col#34) && (col1#32 = col#34))
!:- Filter (col2#33 = 1) && isnotnull(col2#33)) && isnotnull(col1#32)) 
&& ((1 = col2#33) && (col1#32 = 1))) 
 :  +- Relation[col1#32,col2#33] parquet
 +- Filter (isnotnull(col#34) && (col#34 = 1))
+- Relation[col#34] parquet  

// (Important) InferFiltersFromConstraints infers (col1#32 = col2#33), 
which is added to the join condition.
Join Inner, ((col1#32 = col2#33) && ((col2#33 = col#34) && (col1#32 = 
col#34)))
!:- Filter (col2#33 = 1) && isnotnull(col2#33)) && isnotnull(col1#32)) 
&& ((1 = col2#33) && (col1#32 = 1))) 
 :  +- Relation[col1#32,col2#33] parquet
 +- Filter (isnotnull(col#34) && (col#34 = 1))
+- Relation[col#34] parquet

 // PushPredicateThroughJoin pushes down (col1#32 = col2#33) and then 
CombineFilters produces
Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))
!:- Filter isnotnull(col1#32) && (col2#33 = 1)) && isnotnull(col2#33)) 
&& ((1 = col2#33) && (col1#32 = 1))) && (col2#33 = col1#32))
 :  +- Relation[col1#32,col2#33] parquet
 +- Filter (isnotnull(col#34) && (col#34 = 1))
+- Relation[col#34] parquet 
 

```
After that, `ConstantPropagation` replaces `(col2#33 = col1#32)` as `(1 = 
1)`, `BooleanSimplification` removes `(1 = 1)`, `InferFiltersFromConstraints` 
infers `(col2#33 = col1#32)` again and the procedure repeats forever. Since 
`InferFiltersFromConstraints` is the last optimization rule, we have the 
redundant condition mentioned by you. The Optimizer without the new rule will 
also not converge on the following query:

```
Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t1")
Seq(1, 2).toDF("col").write.saveAsTable("t2")
spark.sql("SELECT * FROM t1, t2 WHERE t1.col1 = 1 AND 1 = t1.col2 AND 
t1.col1 = t2.col AND t1.col2 = t2.col").explain(true)
```
Correct me if I am wrong, but it seems like an issue with the existing 
rules.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18740: [SPARK-21538][SQL] Attribute resolution inconsist...

2017-07-27 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/18740#discussion_r129911780
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
---
@@ -1304,6 +1304,15 @@ class DatasetSuite extends QueryTest with 
SharedSQLContext {
   assert(rlike3.count() == 0)
 }
   }
+
+  test("SPARK-21538: Attribute resolution inconsistency in Dataset API") {
+val df = spark.range(1).withColumnRenamed("id", "x")
+checkAnswer(df.sort(col("id")), df.sort("id"))
+checkAnswer(df.sort($"id"), df.sort("id"))
+checkAnswer(df.sort('id), df.sort("id"))
+checkAnswer(df.orderBy('id), df.sort("id"))
+checkAnswer(df.orderBy("id"), df.sort("id"))
--- End diff --

Indeed, looks much better. I appreciate the explanation and will take this 
into account in the future. I will update the test in a minute, thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18740: [SPARK-21538][SQL] Attribute resolution inconsist...

2017-07-26 Thread aokolnychyi
GitHub user aokolnychyi opened a pull request:

https://github.com/apache/spark/pull/18740

[SPARK-21538][SQL] Attribute resolution inconsistency in the Dataset API

## What changes were proposed in this pull request?

This PR contains a tiny update that removes an attribute resolution 
inconsistency in the Dataset API. The following example is taken from the 
ticket description:

```
spark.range(1).withColumnRenamed("id", "x").sort(col("id"))  // works
spark.range(1).withColumnRenamed("id", "x").sort($"id")  // works
spark.range(1).withColumnRenamed("id", "x").sort('id) // works
spark.range(1).withColumnRenamed("id", "x").sort("id") // fails with:
org.apache.spark.sql.AnalysisException: Cannot resolve column name "id" 
among (x);
```
The above `AnalysisException` happens because the last case calls 
`Dataset.apply()` to convert strings into columns, which triggers attribute 
resolution. To make the API consistent between overloaded methods, this PR 
delays the resolution and constructs columns directly.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aokolnychyi/spark spark-21538

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18740.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18740


commit b0fbd5ee98de0ed725d96e63943da934150f6c3e
Author: aokolnychyi <anton.okolnyc...@sap.com>
Date:   2017-07-26T19:56:44Z

[SPARK-21538][SQL] Fix attribute resolution inconsistency in the Dataset API




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18692: [SPARK-21417][SQL] Detect joind conditions via filter ex...

2017-07-24 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/18692
  
@gatorsmile thanks for the input. Let me check that I understood everything 
correctly. So, I keep it as a separate rule that is applied only if constraint 
propagation enabled. Inside the rule, I rely on `join.constraints` to infer the 
join conditions. The remaining logic stays the same. Correct?

I guess that `InferFiltersFromConstraints` can be used as a guideline.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18692: [SPARK-21417][SQL] Detect joind conditions via filter ex...

2017-07-21 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/18692
  
@cloud-fan which rule do you mean? `PushPredicateThroughJoin` seems to be 
the closest by logic but it has a slightly different purpose and does not cover 
this use case. In fact, I used the proposed rule in conjunction with 
`PushPredicateThroughJoin` in the tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18692: [SPARK-21417][SQL] Detect joind conditions via fi...

2017-07-20 Thread aokolnychyi
GitHub user aokolnychyi opened a pull request:

https://github.com/apache/spark/pull/18692

[SPARK-21417][SQL] Detect joind conditions via filter expressions

## What changes were proposed in this pull request?

This PR adds an optimization rule that infers join conditions based on 
filter expressions that are specified. 

For example, 
`SELECT * FROM t1, t2 WHERE t1.col1 = 1 AND t2.col2 = 1` 
can be transformed into 
`SELECT * FROM t1 JOIN t2 ON t1.col1 = t2.col2 WHERE t1.col1 = 1 AND 
t2.col2 = 1`.

Refer to the corresponding ticket and tests for more details.

## How was this patch tested?

This patch comes with a new test suite to cover the implemented logic.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aokolnychyi/spark spark-21417

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18692.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18692


commit e67d4d3c0bdf5cac4c6b17b50314984a2a6378d2
Author: aokolnychyi <anton.okolnyc...@sap.com>
Date:   2017-07-18T18:49:16Z

[SPARK-21417][SQL] Detect joind conditions via filter expressions




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18583: [SPARK-21332][SQL] Incorrect result type inferred for so...

2017-07-17 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/18583
  
Can we, please, trigger this one more time?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18583: [SPARK-21332][SQL] Incorrect result type inferred...

2017-07-10 Thread aokolnychyi
GitHub user aokolnychyi opened a pull request:

https://github.com/apache/spark/pull/18583

[SPARK-21332][SQL] Incorrect result type inferred for some decimal 
expressions

## What changes were proposed in this pull request?

This PR changes the direction of expression transformation in the 
DecimalPrecision rule. Previously, the expressions were transformed down, which 
led to incorrect result types when decimal expressions had other decimal 
expressions as their operands. The root cause of this issue was in visiting 
outer nodes before their children. Consider the example below:

```
val inputSchema = StructType(StructField("col", DecimalType(26, 6)) :: 
Nil)
val sc = spark.sparkContext
val rdd = sc.parallelize(1 to 2).map(_ => Row(BigDecimal(12)))
val df = spark.createDataFrame(rdd, inputSchema)

// Works correctly since no nested decimal expression is involved
// Expected result type: (26, 6) * (26, 6) = (38, 12)
df.select($"col" * $"col").explain(true)
df.select($"col" * $"col").printSchema()

// Gives a wrong result since there is a nested decimal expression that 
should be visited first
// Expected result type: ((26, 6) * (26, 6)) * (26, 6) = (38, 12) * 
(26, 6) = (38, 18)
df.select($"col" * $"col" * $"col").explain(true)
df.select($"col" * $"col" * $"col").printSchema()
```

The example above gives the following output: 

```
// Correct result without sub-expressions
== Parsed Logical Plan ==
'Project [('col * 'col) AS (col * col)#4]
+- LogicalRDD [col#1]

== Analyzed Logical Plan ==
(col * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * 
promote_precision(cast(col#1 as decimal(26,6, DecimalType(38,12)) AS (col * 
col)#4]
+- LogicalRDD [col#1]

== Optimized Logical Plan ==
Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * 
col)#4]
+- LogicalRDD [col#1]

== Physical Plan ==
*Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * 
col)#4]
+- Scan ExistingRDD[col#1]

// Schema
root
 |-- (col * col): decimal(38,12) (nullable = true)


// Incorrect result with sub-expressions
== Parsed Logical Plan ==
'Project [(('col * 'col) * 'col) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Analyzed Logical Plan ==
((col * col) * col): decimal(38,12)
Project 
[CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(col#1
 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6, 
DecimalType(38,12)) as decimal(26,6))) * promote_precision(cast(col#1 as 
decimal(26,6, DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Optimized Logical Plan ==
Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), 
DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * 
col) * col)#11]
+- LogicalRDD [col#1]

== Physical Plan ==
*Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), 
DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * 
col) * col)#11]
+- Scan ExistingRDD[col#1]

// Schema
root
 |-- ((col * col) * col): decimal(38,12) (nullable = true)
```

## How was this patch tested?

This PR was tested with available unit tests. Moreover, there are tests to 
cover previously failing scenarios.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aokolnychyi/spark spark-21332

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18583.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18583


commit 7b35ed2e497f3660e100ad103244beea767f2a73
Author: aokolnychyi <anton.okolnyc...@sap.com>
Date:   2017-07-09T12:54:48Z

[SPARK-21332][SQL] Fix the incorrect result type inferred for some decimal 
expressions




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18368: [SPARK-21102][SQL] Refresh command is too aggressive in ...

2017-07-03 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/18368
  
@gatorsmile should be fixed now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18368: [SPARK-21102][SQL] Make refresh resource command less ag...

2017-06-27 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/18368
  
@shaneknapp It seems that the build fails with an exception non-related to 
the PR. Therefore, I will just close this one and open a new one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18368: [SPARK-21102][SQL] Make refresh resource command less ag...

2017-06-21 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/18368
  
@shaneknapp can we trigger this one more time, please?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18368: [SPARK-21102][SQL] Make refresh resource command ...

2017-06-20 Thread aokolnychyi
GitHub user aokolnychyi opened a pull request:

https://github.com/apache/spark/pull/18368

[SPARK-21102][SQL] Make refresh resource command less aggressive in p…

### Idea

This PR adds validation to REFRESH sql statements. Currently, users can 
specify whatever they want as resource path. For example, spark.sql("REFRESH ! 
$ !") will be executed without any exceptions.

### Implementation 

I am not sure that my current implementation is the most optimal, so any 
feedback is appreciated. My first idea was to make the grammar as strict as 
possible. Unfortunately, there were some problems. I tried the approach below:

SqlBase.g4
```
...
| REFRESH TABLE tableIdentifier
#refreshTable
| REFRESH resourcePath 
#refreshResource
...

resourcePath
: STRING
| (IDENTIFIER | number | nonReserved | '/' | '-')+ // other symbols can 
be added if needed
;
```
It is not flexible enough and requires to explicitly mention all possible 
symbols. Therefore, I came up with the current approach that is implemented in 
the code.

Let me know your opinion on which one is better.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aokolnychyi/spark spark-21102

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18368.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18368


commit fc2b7c02fab7f570ae3ca080ae1c2c9502300de7
Author: aokolnychyi <anton.okolnyc...@sap.com>
Date:   2017-06-19T18:17:18Z

[SPARK-21102][SQL] Make refresh resource command less aggressive in parsing




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18252: [SPARK-17914][SQL] Fix parsing of timestamp strings with...

2017-06-12 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/18252
  
@wzhfy @rxin @ueshin can someone, please, merge this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18252: [SPARK-17914][SQL] Fix parsing of timestamp strin...

2017-06-10 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/18252#discussion_r121252397
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 ---
@@ -399,13 +399,13 @@ object DateTimeUtils {
   digitsMilli += 1
 }
 
-if (!justTime && isInvalidDate(segments(0), segments(1), segments(2))) 
{
-  return None
+while (digitsMilli > 6) {
--- End diff --

@wzhfy done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18252: [SPARK-17914][SQL] Fix parsing of timestamp strin...

2017-06-10 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/18252#discussion_r121251811
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 ---
@@ -32,7 +32,7 @@ import org.apache.spark.unsafe.types.UTF8String
  * Helper functions for converting between internal and external date and 
time representations.
  * Dates are exposed externally as java.sql.Date and are represented 
internally as the number of
  * dates since the Unix epoch (1970-01-01). Timestamps are exposed 
externally as java.sql.Timestamp
- * and are stored internally as longs, which are capable of storing 
timestamps with 100 nanosecond
+ * and are stored internally as longs, which are capable of storing 
timestamps with microsecond
--- End diff --

Sure, but the previous comment, which was introduced in 
[this](https://github.com/apache/spark/commit/6b7f2ceafdcbb014791909747c2210b527305df9)
 commit, was no longer correct. The logic was changed in 
[this](https://github.com/apache/spark/commit/a290814877308c6fa9b0f78b1a81145db7651ca4)
 commit and now it is up to microseconds.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18252: [SPARK-17914][SQL] Fix parsing of timestamp strings with...

2017-06-09 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/18252
  
@ueshin good point, thanks. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18252: [SPARK-17914][SQL] Fix parsing of timestamp strin...

2017-06-09 Thread aokolnychyi
GitHub user aokolnychyi opened a pull request:

https://github.com/apache/spark/pull/18252

[SPARK-17914][SQL] Fix parsing of timestamp strings with nanoseconds

The PR contains a tiny change to fix the way Spark parses string literals 
into timestamps. Currently, some timestamps that contain nanoseconds are 
corrupted during the conversion from internal UTF8Strings into the internal 
representation of timestamps.

Consider the following example:
```
spark.sql("SELECT cast('2015-01-02 00:00:00.1' as 
TIMESTAMP)").show(false)
++
|CAST(2015-01-02 00:00:00.1 AS TIMESTAMP)|
++
|2015-01-02 00:00:00.01  |
++
```

The fix was tested with existing tests. Also, there is a new test to cover 
cases that did not work previously.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aokolnychyi/spark spark-17914

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18252.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18252


commit 2f232a7bda28fb42759ee35923044f886a1ff19e
Author: aokolnychyi <anton.okolnyc...@sap.com>
Date:   2017-06-08T18:52:14Z

[SPARK-17914][SQL] Fix parsing of timestamp strings with nanoseconds




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL...

2017-01-24 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/16329#discussion_r97589440
  
--- Diff: 
examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedTypedAggregation.java
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql;
+
+// $example on:typed_custom_aggregation$
+import java.io.Serializable;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.TypedColumn;
+import org.apache.spark.sql.expressions.Aggregator;
+// $example off:typed_custom_aggregation$
+
+public class JavaUserDefinedTypedAggregation {
+
+  // $example on:typed_custom_aggregation$
+  public static class Employee implements Serializable {
+private String name;
+private long salary;
+
+// Constructors, getters, setters...
+// $example off:typed_custom_aggregation$
+public String getName() {
+  return name;
+}
+
+public void setName(String name) {
+  this.name = name;
+}
+
+public long getSalary() {
+  return salary;
+}
+
+public void setSalary(long salary) {
+  this.salary = salary;
+}
+// $example on:typed_custom_aggregation$
+  }
+
+  public static class Average implements Serializable  {
+private long sum;
+private long count;
+
+// Constructors, getters, setters...
+// $example off:typed_custom_aggregation$
+public Average() {
+}
+
+public Average(long sum, long count) {
+  this.sum = sum;
+  this.count = count;
+}
+
+public long getSum() {
+  return sum;
+}
+
+public void setSum(long sum) {
+  this.sum = sum;
+}
+
+public long getCount() {
+  return count;
+}
+
+public void setCount(long count) {
+  this.count = count;
+}
+// $example on:typed_custom_aggregation$
+  }
+
+  public static class MyAverage extends Aggregator<Employee, Average, 
Double> {
+// A zero value for this aggregation. Should satisfy the property that 
any b + zero = b
+public Average zero() {
--- End diff --

@srowen `Average` is a Java bean that holds current sum and count. It is 
defined earlier. Here it represents a zero value. `MyAverage`, in turn, is the 
actual aggregator that accepts instances of the `Employee` class, stores 
intermediate results using an instance of`Average`, and produces `Double` as a 
result. 

I can rename `MyAverage` to `MyAverageAggregator` if this makes things 
clearer. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL...

2016-12-22 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/16329#discussion_r93606051
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql
+
+// $example on:typed_custom_aggregation$
+import org.apache.spark.sql.expressions.Aggregator
+import org.apache.spark.sql.Encoder
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.SparkSession
+// $example off:typed_custom_aggregation$
+
+object UserDefinedTypedAggregation {
+
+  // $example on:typed_custom_aggregation$
+  case class Employee(name: String, salary: Long)
+  case class Average(var sum: Long, var count: Long)
+
+  object MyAverage extends Aggregator[Employee, Average, Double] {
+// A zero value for this aggregation. Should satisfy the property that 
any b + zero = b
+def zero: Average = Average(0L, 0L)
+// Combine two values to produce a new value. For performance, the 
function may modify `buffer`
+// and return it instead of constructing a new object
+def reduce(buffer: Average, employee: Employee): Average = {
+  buffer.sum += employee.salary
+  buffer.count += 1
+  buffer
+}
+// Merge two intermediate values
+def merge(b1: Average, b2: Average): Average = Average(b1.sum + 
b2.sum, b1.count + b2.count)
--- End diff --

@michalsenkyr It is not required to create a new object in the `merge` 
method. One can modify the vars and return the existing object just like in the 
`reduce` method.  However, it is less critical here since this method will be 
called on pre-aggregated data and not for every element. On the one hand, I can 
apply here the same approach as in the `reduce` method to make the example 
consistent. On the other hand, the current code shows that it is not mandatory 
to modify vars. Probably, a comment might help. I am not sure which approach is 
better. Therefore, I am open to suggestions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL...

2016-12-21 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/16329#discussion_r93395745
  
--- Diff: docs/sql-programming-guide.md ---
@@ -382,6 +382,52 @@ For example:
 
 
 
+## Aggregations
+
+The [built-in DataFrames 
functions](api/scala/index.html#org.apache.spark.sql.functions$) mentioned 
+before provide such common aggregations as `count()`, `countDistinct()`, 
`avg()`, `max()`, `min()`, etc.
+While those functions are designed for DataFrames, Spark SQL also has 
type-safe versions for some of them in 

+[Scala](api/scala/index.html#org.apache.spark.sql.expressions.scalalang.typed$)
 and 
+[Java](api/java/org/apache/spark/sql/expressions/javalang/typed.html) to 
work with strongly typed Datasets.
+Moreover, users are not limited to the predefined aggregate functions and 
can create their own.
--- End diff --

I also thought about this. In my view, it will be appropriate to have a 
separate subsection before Aggregations to show how to apply predefined SQL 
functions, including writing your own UDFs. That's will be worth another pull 
request. Alternatively, I can also try to extend this one to add an example of 
`max()` or `min()`. @marmbrus what's your opinion?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL progra...

2016-12-20 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/16329
  
@marmbrus I have updated the pull request. The compiled docs can be found 
[here](https://aokolnychyi.github.io/spark-docs/sql-programming-guide.html). 

I did not manage to build the Java API docs. I believe the problem is in my 
local installation. Therefore, I checked each url manually, they should work 
once the API docs are compiled. I will verify everything one more time in the 
nightly build.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL...

2016-12-19 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/16329#discussion_r93019316
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql
+
+// $example on:untyped_custom_aggregation$
+import org.apache.spark.sql.expressions.MutableAggregationBuffer
+import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.SparkSession
+// $example off:untyped_custom_aggregation$
+
+object UserDefinedUntypedAggregation {
+
+  // $example on:untyped_custom_aggregation$
+  object MyAverage extends UserDefinedAggregateFunction {
+// Data types of input arguments
+def inputSchema: StructType = StructType(StructField("salary", 
LongType) :: Nil)
+// Data types of values in the aggregation buffer
+def bufferSchema: StructType = {
+  StructType(StructField("sum", LongType) :: StructField("count", 
LongType) :: Nil)
+}
+// The data type of the returned value
+def dataType: DataType = DoubleType
+// Whether this function always returns the same output on the 
identical input
+def deterministic: Boolean = true
+// Initializes the given aggregation buffer
+def initialize(buffer: MutableAggregationBuffer): Unit = {
--- End diff --

Agree, I will try to add a small but meaningful explanation here. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL...

2016-12-19 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/16329#discussion_r93019035
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql
+
+// $example on:untyped_custom_aggregation$
+import org.apache.spark.sql.expressions.MutableAggregationBuffer
+import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.SparkSession
+// $example off:untyped_custom_aggregation$
+
+object UserDefinedUntypedAggregation {
+
+  // $example on:untyped_custom_aggregation$
+  object MyAverage extends UserDefinedAggregateFunction {
+// Data types of input arguments
+def inputSchema: StructType = StructType(StructField("salary", 
LongType) :: Nil)
--- End diff --

Yes, your point is definitely reasonable. Now I am thinking whether I 
should keep "salary" here. As an option, I can replace "salary" with 
"inputColumn" or something like this to make `MyAverage` more generic. No 
reason to bound it to salary. What's your opinion?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL...

2016-12-18 Thread aokolnychyi
GitHub user aokolnychyi opened a pull request:

https://github.com/apache/spark/pull/16329

[SPARK-16046][DOCS] Aggregations in the Spark SQL programming guide

## What changes were proposed in this pull request?

- A separate subsection for Aggregations under “Getting Started” in the 
Spark SQL programming guide. It mentions which are predefined and how users can 
create their own.
- Examples of using the `UserDefinedAggregateFunction` abstract class for 
untyped aggregations in Java and Scala.
- Examples of using the `Aggregator` abstract class for type-safe 
aggregations in Java and Scala.
- Python is not covered. 
- The PR might not resolve the ticket since I do not know what was exactly 
planned by the author. 

In total, there are four new standalone examples that can be executed via 
`spark-submit` or `run-example`. The updated Spark SQL programming guide 
references to these examples and does not contain hard-coded snippets. 

## How was this patch tested?

The patch was tested locally by building the docs. The examples were run as 
well. 


![image](https://cloud.githubusercontent.com/assets/6235869/21292915/04d9d084-c515-11e6-811a-999d598dffba.png)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aokolnychyi/spark SPARK-16046

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16329.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16329


commit 8c18b2a980ddfe220c380d0a60e379d9fdeac488
Author: aokolnychyi <okolnychyyan...@gmail.com>
Date:   2016-12-18T10:08:03Z

[SPARK-16046][DOCS] Aggregations in the Spark SQL programming guide




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16024: [MINOR][DOCS] Updates to the Accumulator example ...

2016-11-28 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/16024#discussion_r89791975
  
--- Diff: docs/programming-guide.md ---
@@ -1378,29 +1378,36 @@ res2: Long = 10
 
 While this code used the built-in support for accumulators of type Long, 
programmers can also
 create their own types by subclassing 
[AccumulatorV2](api/scala/index.html#org.apache.spark.util.AccumulatorV2).
-The AccumulatorV2 abstract class has several methods which need to 
override: 
-`reset` for resetting the accumulator to zero, and `add` for add anothor 
value into the accumulator, `merge` for merging another same-type accumulator 
into this one. Other methods need to override can refer to scala API document. 
For example, supposing we had a `MyVector` class
-representing mathematical vectors, we could write:
+The AccumulatorV2 abstract class has several methods which one has to 
override:
+`reset` for resetting the accumulator to zero, `add` for adding another 
value into the accumulator,
+and `merge` for merging another same-type accumulator into this one. Other 
methods that must be overridden
+are contained in the [API 
documentation](api/scala/index.html#org.apache.spark.util.AccumulatorV2).
+For example, supposing we had a `MathVector` class representing 
mathematical vectors, we could write:
--- End diff --

Your point makes sense to me. 

The renaming was to done to improve the consistency between Java/Scala 
examples. For instance, the Scala example used MyVector, while the Java one - 
Vector. 

Will it be a better idea to keep MyVector in the Scala example and reuse 
this name in Java?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16024: [MINOR][DOCS] Updates to the Accumulator example ...

2016-11-28 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/16024#discussion_r89788527
  
--- Diff: docs/programming-guide.md ---
@@ -1424,29 +1431,38 @@ accum.value();
 // returns 10
 {% endhighlight %}
 
-Programmers can also create their own types by subclassing

-[AccumulatorParam](api/java/index.html?org/apache/spark/AccumulatorParam.html).
-The AccumulatorParam interface has two methods: `zero` for providing a 
"zero value" for your data
-type, and `addInPlace` for adding two values together. For example, 
supposing we had a `Vector` class
-representing mathematical vectors, we could write:
+While this code used the built-in support for accumulators of type Long, 
programmers can also
+create their own types by subclassing 
[AccumulatorV2](api/scala/index.html#org.apache.spark.util.AccumulatorV2).
+The AccumulatorV2 abstract class has several methods which one has to 
override:
+`reset` for resetting the accumulator to zero, `add` for adding another 
value into the accumulator,
+and `merge` for merging another same-type accumulator into this one. Other 
methods that must be overridden
+are contained in the [API 
documentation](api/scala/index.html#org.apache.spark.util.AccumulatorV2).
+For example, supposing we had a `MathVector` class representing 
mathematical vectors, we could write:
 
 {% highlight java %}
-class VectorAccumulatorParam implements AccumulatorParam {
-  public Vector zero(Vector initialValue) {
-return Vector.zeros(initialValue.size());
+class MathVectorAccumulatorV2 implements AccumulatorV2<MathVector, 
MathVector> {
+
+  private MathVector mathVector = MathVector.newZeroVector();
+
+  public void reset() {
+mathVector.reset();
   }
-  public Vector addInPlace(Vector v1, Vector v2) {
-v1.addInPlace(v2); return v1;
+
+  public void add(MathVector v) {
+mathVector.add(v);
   }
+
+  ...
 }
 
-// Then, create an Accumulator of this type:
-Accumulator vecAccum = sc.accumulator(new Vector(...), new 
VectorAccumulatorParam());
+// Create an instance
+MathVectorAccumulatorV2 mathVectorAcc = new MathVectorAccumulatorV2();
+// Register it
+jsc.sc().register(mathVectorAcc, "MathVectorAcc1");
 {% endhighlight %}
 
-In Java, Spark also supports the more general 
[Accumulable](api/java/index.html?org/apache/spark/Accumulable.html)
-interface to accumulate data where the resulting type is not the same as 
the elements added (e.g. build
-a list by collecting together elements).
+Note that, when programmers define their own type of AccumulatorV2,the 
resulting type can be different
--- End diff --

Sure, I'll update that. Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16024: [MINOR][DOCS] Updates to the Accumulator example ...

2016-11-27 Thread aokolnychyi
GitHub user aokolnychyi opened a pull request:

https://github.com/apache/spark/pull/16024

[MINOR][DOCS] Updates to the Accumulator example in the programming guide. 
Fixed typos, AccumulatorV2 in Java

## What changes were proposed in this pull request?

This pull request contains updates to Scala and Java Accumulator code 
snippets in the programming guide. 

- For Scala, the pull request fixes the signature of the 'add()' method in 
the custom Accumulator, which contained two params (as the old 
AccumulatorParam) instead of one (as in AccumulatorV2). 

- The Java example was updated to use the AccumulatorV2 class since 
AccumulatorParam is marked as deprecated.  

- Scala and Java examples are more consistent now.

## How was this patch tested?

This patch was tested manually by building the docs locally.


![image](https://cloud.githubusercontent.com/assets/6235869/20652099/77d98d18-b4f3-11e6-8565-a995fe8cf8e5.png)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aokolnychyi/spark fixed_accumulator_example

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16024.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16024


commit f4388a84181e7c40003db0b1574602350160721a
Author: aokolnychyi <okolnychyyan...@gmail.com>
Date:   2016-11-27T21:30:26Z

[MINOR][DOCS] Updates to the Accumulator example in the programming guide. 
Fixed typos, AccumulatorV2 in Java




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14050: [MINOR][EXAMPLES] Window function examples

2016-10-22 Thread aokolnychyi
Github user aokolnychyi closed the pull request at:

https://github.com/apache/spark/pull/14050


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14119: [SPARK-16303][DOCS][EXAMPLES][WIP] Updated SQL programmi...

2016-07-12 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/14119
  
**Summary of the updates**

- `JavaSparkSQL.java` file was removed. I kept it initially since the file 
itself was quite old (2+ years) and it was present in your original WIP branch 
alongside the new file. But I can confirm that the new file covers the same 
functionality and more. No need to keep the old one, agree with you.
- Apache header in `JavaSqlDataSourceExample.java` was added.
- `$`-notation instead of `df.col("...")` in Scala examples.
- `col("...")` instead of `df.col("...")` in Java examples.
- Blank lines before `{% include_example programmatic_schema ... }` were 
added. However, everything was rendered fine locally even without them.
- 2 space indentation for chained method calls. My fault, sorry.
- Actual outputs for all `show()` calls were added.
- Tested manually and via `./dev/run-tests`.

**Open questions**
- Shall I add blank lines before each `{% include_example ... }` or only 
before those two examples?
- I pointed to a wrong location that exceeded the length limit. It is 
exactly the same functionality but in Java. So, 113 and 117 lines of the 
`JavaSqlDataSourceExample.java` file. In my view, it would make sense to keep 
them as they are now for the better looking documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14119: [SPARK-16303][DOCS][EXAMPLES][WIP] Updated SQL pr...

2016-07-09 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/14119#discussion_r70173180
  
--- Diff: 
examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSqlExample.java 
---
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql;
+
+// $example on:programmatic_schema$
+import java.util.ArrayList;
+import java.util.List;
+// $example off:programmatic_schema$
+// $example on:create_ds$
+import java.util.Arrays;
--- End diff --

Here the imports do not follow the alphabetical order to avoid too many 
imports groups in the documentation (there would be a blank line between each 
"example on/off" block).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14119: [SPARK-16303][DOCS][EXAMPLES][WIP] Updated SQL pr...

2016-07-09 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/14119#discussion_r70173131
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql
+
+import org.apache.spark.sql.SparkSession
+
+object SqlDataSourceExample {
+
+  case class Person(name: String, age: Long)
+
+  def main(args: Array[String]) {
+val spark = SparkSession
+.builder()
+.appName("Spark SQL Data Soures Example")
+.config("spark.some.config.option", "some-value")
+.getOrCreate()
+
+runBasicDataSourceExample(spark)
+runBasicParquetExample(spark)
+runParquetSchemaMergingExample(spark)
+runJsonDatasetExample(spark)
+
+spark.stop()
+  }
+
+  private def runBasicDataSourceExample(spark: SparkSession): Unit = {
+// $example on:generic_load_save_functions$
+val usersDF = 
spark.read.load("examples/src/main/resources/users.parquet")
+usersDF.select("name", 
"favorite_color").write.save("namesAndFavColors.parquet")
+// $example off:generic_load_save_functions$
+// $example on:manual_load_options$
+val peopleDF = 
spark.read.format("json").load("examples/src/main/resources/people.json")
+peopleDF.select("name", 
"age").write.format("parquet").save("namesAndAges.parquet")
+// $example off:manual_load_options$
+// $example on:direct_sql$
+val sqlDF = spark.sql("SELECT * FROM 
parquet.`examples/src/main/resources/users.parquet`")
--- End diff --

Here the line length slightly exceeds the limit to make the look of the 
documentation better. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14119: [SPARK-16303][DOCS][EXAMPLES][WIP] Updated SQL pr...

2016-07-09 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/14119#discussion_r70173058
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
 ---
@@ -41,43 +35,47 @@ object HiveFromSpark {
 // in the current directory and creates a directory configured by 
`spark.sql.warehouse.dir`,
 // which defaults to the directory `spark-warehouse` in the current 
directory that the spark
 // application is started.
-val spark = SparkSession.builder
-  .appName("HiveFromSpark")
-  .enableHiveSupport()
-  .getOrCreate()
+
+// $example on:spark_hive$
+// warehouseLocation points to the default location for managed 
databases and tables
+val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
+
+val spark = SparkSession
+.builder()
+.appName("Spark Hive Example")
+.config("spark.sql.warehouse.dir", warehouseLocation)
+.enableHiveSupport()
+.getOrCreate()
 
 import spark.implicits._
 import spark.sql
 
 sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
-sql(s"LOAD DATA LOCAL INPATH '${kv1File.getAbsolutePath}' INTO TABLE 
src")
+sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO 
TABLE src")
 
 // Queries are expressed in HiveQL
-println("Result of 'SELECT *': ")
-sql("SELECT * FROM src").collect().foreach(println)
+sql("SELECT * FROM src").show()
--- End diff --

I replaced collect().foreach(println) with show() in all examples. Is it OK?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14119: [SPARK-16303][DOCS][EXAMPLES][WIP] Updated SQL pr...

2016-07-09 Thread aokolnychyi
Github user aokolnychyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/14119#discussion_r70173035
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1380,17 +949,17 @@ metadata.
 
 
 {% highlight scala %}
-// spark is an existing HiveContext
-spark.refreshTable("my_table")
+// spark is an existing SparkSession
+spark.catalog.refreshTable("my_table")
--- End diff --

Is it the correct way to refresh?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14119: [SPARK-16303][DOCS][EXAMPLES][WIP] Updated SQL programmi...

2016-07-09 Thread aokolnychyi
Github user aokolnychyi commented on the issue:

https://github.com/apache/spark/pull/14119
  
@liancheng could you, please, review this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14119: [SPARK-16303][DOCS][EXAMPLES][WIP] Updated SQL pr...

2016-07-09 Thread aokolnychyi
GitHub user aokolnychyi opened a pull request:

https://github.com/apache/spark/pull/14119

[SPARK-16303][DOCS][EXAMPLES][WIP] Updated SQL programming guide and 
examples

## What changes were proposed in this pull request?

- Hard-coded Spark SQL sample snippets were moved into source files under 
examples sub-project.
- Removed the inconsistency between Scala and Java Spark SQL examples
- Scala and Java Spark SQL examples were updated

## How was this patch tested?

The work is still in progress. All involved examples were tested manually. 
An additional round of testing will be done after the code review.




![image](https://cloud.githubusercontent.com/assets/6235869/16710314/51851606-462a-11e6-9fbe-0818daef65e4.png)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aokolnychyi/spark spark_16303

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/14119.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #14119


commit 95f0f41fa12e1c6f0fb8ce6cd4222fb63842b495
Author: aokolnychyi <okolnychyyan...@gmail.com>
Date:   2016-07-09T20:56:47Z

[SPARK-16303][DOCS][EXAMPLES] Updated SQL programming guide and examples




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14050: [MINOR][EXAMPLES] Window function examples

2016-07-04 Thread aokolnychyi
GitHub user aokolnychyi opened a pull request:

https://github.com/apache/spark/pull/14050

[MINOR][EXAMPLES] Window function examples

## What changes were proposed in this pull request?

An example that explains the usage of window functions. 
It shows the difference between no/unbounded/bounded window frames and how 
they are resolved.
The example also embraces 2 ways to define window frames: based on physical 
(rowsBetween) and logical (rangeBetween) offsets.

The example should be useful for people who do not have much experience 
with window functions since it explains how Spark internally deals with window 
frames.

## How was this patch tested?

The existing tests were run, no failures. No additional test cases are 
needed.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aokolnychyi/spark window_function_examples

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/14050.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #14050


commit fed17613e23c1634ae47542a00960dac77bc95fc
Author: aokolnychyi <okolnychyyan...@gmail.com>
Date:   2016-07-04T22:25:39Z

[MINOR][EXAMPLES] Window function examples




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org