[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...

2017-09-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19056


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...

2017-09-05 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19056#discussion_r137143373
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -128,8 +128,9 @@ class TextSocketSource(host: String, port: Int, 
includeTimestamp: Boolean, sqlCo
   batches.slice(sliceStart, sliceEnd)
 }
 
-val rdd = sqlContext.sparkContext.parallelize(rawList).map(
-v => InternalRow(UTF8String.fromString(v._1), v._2.getTime()))
+val rdd = sqlContext.sparkContext.
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...

2017-09-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19056#discussion_r137142093
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -128,8 +128,9 @@ class TextSocketSource(host: String, port: Int, 
includeTimestamp: Boolean, sqlCo
   batches.slice(sliceStart, sliceEnd)
 }
 
-val rdd = sqlContext.sparkContext.parallelize(rawList).map(
-v => InternalRow(UTF8String.fromString(v._1), v._2.getTime()))
+val rdd = sqlContext.sparkContext.
--- End diff --

we generally put the '.' in the next line. So
```
val rdd = sqlContext.sparkContext
   .parall ...
   .map 
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...

2017-09-05 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19056#discussion_r137048000
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -130,16 +130,7 @@ class TextSocketSource(host: String, port: Int, 
includeTimestamp: Boolean, sqlCo
 
 val rdd = sqlContext.sparkContext.parallelize(rawList).map(
 v => InternalRow(UTF8String.fromString(v._1), v._2.getTime()))
--- End diff --

Done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...

2017-09-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19056#discussion_r136638327
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -130,16 +130,7 @@ class TextSocketSource(host: String, port: Int, 
includeTimestamp: Boolean, sqlCo
 
 val rdd = sqlContext.sparkContext.parallelize(rawList).map(
 v => InternalRow(UTF8String.fromString(v._1), v._2.getTime()))
--- End diff --

@joseph-torres this is a nit but a good suggestion. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...

2017-08-30 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19056#discussion_r135989439
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -130,16 +130,7 @@ class TextSocketSource(host: String, port: Int, 
includeTimestamp: Boolean, sqlCo
 
 val rdd = sqlContext.sparkContext.parallelize(rawList).map(
 v => InternalRow(UTF8String.fromString(v._1), v._2.getTime()))
--- End diff --

May I ask to replace `v` with `case...` as follows? IMHO That would make 
things easier to read.

```
val rdd = sqlContext.sparkContext.
  parallelize(rawList).
  map { case (v, ts) => InternalRow(UTF8String.fromString(v), 
ts.getTime) }
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...

2017-08-29 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19056#discussion_r135851647
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -126,16 +128,17 @@ class TextSocketSource(host: String, port: Int, 
includeTimestamp: Boolean, sqlCo
   batches.slice(sliceStart, sliceEnd)
 }
 
-import sqlContext.implicits._
-val rawBatch = sqlContext.createDataset(rawList)
+val rdd = sqlContext.sparkContext.parallelize(rawList).map(
+v => InternalRow(UTF8String.fromString(v._1), v._2.getTime()))
+val rawBatch = sqlContext.internalCreateDataFrame(rdd, schema, 
isStreaming = true)
 
 // Underlying MemoryStream has schema (String, Timestamp); strip out 
the timestamp
 // if requested.
 if (includeTimestamp) {
   rawBatch.toDF("value", "timestamp")
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...

2017-08-29 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19056#discussion_r135851433
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -126,16 +128,17 @@ class TextSocketSource(host: String, port: Int, 
includeTimestamp: Boolean, sqlCo
   batches.slice(sliceStart, sliceEnd)
 }
 
-import sqlContext.implicits._
-val rawBatch = sqlContext.createDataset(rawList)
+val rdd = sqlContext.sparkContext.parallelize(rawList).map(
+v => InternalRow(UTF8String.fromString(v._1), v._2.getTime()))
+val rawBatch = sqlContext.internalCreateDataFrame(rdd, schema, 
isStreaming = true)
 
 // Underlying MemoryStream has schema (String, Timestamp); strip out 
the timestamp
 // if requested.
 if (includeTimestamp) {
   rawBatch.toDF("value", "timestamp")
 } else {
   // Strip out timestamp
-  rawBatch.select("_1").toDF("value")
+  rawBatch.select("value").toDF()
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...

2017-08-29 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19056#discussion_r135851225
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -39,6 +39,16 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
 
   protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations)
 
+  override protected def checkInvariants(
+  result: LogicalPlan,
+  original: LogicalPlan,
+  rule: Rule[LogicalPlan]): Unit = {
+assert(
+  result.isStreaming == original.isStreaming,
+  s"Rule ${rule.ruleName} changed isStreaming from original 
${original.isStreaming}:" +
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...

2017-08-28 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19056#discussion_r135610992
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -126,16 +128,17 @@ class TextSocketSource(host: String, port: Int, 
includeTimestamp: Boolean, sqlCo
   batches.slice(sliceStart, sliceEnd)
 }
 
-import sqlContext.implicits._
-val rawBatch = sqlContext.createDataset(rawList)
+val rdd = sqlContext.sparkContext.parallelize(rawList).map(
+v => InternalRow(UTF8String.fromString(v._1), v._2.getTime()))
+val rawBatch = sqlContext.internalCreateDataFrame(rdd, schema, 
isStreaming = true)
 
 // Underlying MemoryStream has schema (String, Timestamp); strip out 
the timestamp
 // if requested.
 if (includeTimestamp) {
   rawBatch.toDF("value", "timestamp")
--- End diff --

I think that the schema will already handle what fields are included in 
`rawBatch` and this `if` is no longer necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...

2017-08-28 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19056#discussion_r135610632
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -126,16 +128,17 @@ class TextSocketSource(host: String, port: Int, 
includeTimestamp: Boolean, sqlCo
   batches.slice(sliceStart, sliceEnd)
 }
 
-import sqlContext.implicits._
-val rawBatch = sqlContext.createDataset(rawList)
+val rdd = sqlContext.sparkContext.parallelize(rawList).map(
+v => InternalRow(UTF8String.fromString(v._1), v._2.getTime()))
+val rawBatch = sqlContext.internalCreateDataFrame(rdd, schema, 
isStreaming = true)
 
 // Underlying MemoryStream has schema (String, Timestamp); strip out 
the timestamp
 // if requested.
 if (includeTimestamp) {
   rawBatch.toDF("value", "timestamp")
 } else {
   // Strip out timestamp
-  rawBatch.select("_1").toDF("value")
+  rawBatch.select("value").toDF()
--- End diff --

`toDF` is unnecessary since it's already a DataFrame.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...

2017-08-28 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19056#discussion_r135610234
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -39,6 +39,16 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
 
   protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations)
 
+  override protected def checkInvariants(
+  result: LogicalPlan,
+  original: LogicalPlan,
+  rule: Rule[LogicalPlan]): Unit = {
+assert(
+  result.isStreaming == original.isStreaming,
+  s"Rule ${rule.ruleName} changed isStreaming from original 
${original.isStreaming}:" +
--- End diff --

Space before the closing `"`"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...

2017-08-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19056#discussion_r135360845
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
 ---
@@ -65,11 +66,12 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] 
with PredicateHelper {
   case _: RepartitionByExpression => empty(p)
   // An aggregate with non-empty group expression will return one 
output row per group when the
   // input to the aggregate is not empty. If the input to the 
aggregate is empty then all groups
-  // will be empty and thus the output will be empty.
+  // will be empty and thus the output will be empty. If we're working 
on batch data, we can
+  // then treat the aggregate as redundant.
   //
   // If the grouping expressions are empty, however, then the 
aggregate will always produce a
   // single output row and thus we cannot propagate the EmptyRelation.
-  case Aggregate(ge, _, _) if ge.nonEmpty => empty(p)
+  case Aggregate(ge, _, _) if ge.nonEmpty and !p.isStreaming => 
empty(p)
--- End diff --

also make sure that this exception is covered by the tests. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...

2017-08-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19056#discussion_r135360650
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
 ---
@@ -65,11 +66,12 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] 
with PredicateHelper {
   case _: RepartitionByExpression => empty(p)
   // An aggregate with non-empty group expression will return one 
output row per group when the
   // input to the aggregate is not empty. If the input to the 
aggregate is empty then all groups
-  // will be empty and thus the output will be empty.
+  // will be empty and thus the output will be empty. If we're working 
on batch data, we can
+  // then treat the aggregate as redundant.
   //
   // If the grouping expressions are empty, however, then the 
aggregate will always produce a
   // single output row and thus we cannot propagate the EmptyRelation.
-  case Aggregate(ge, _, _) if ge.nonEmpty => empty(p)
+  case Aggregate(ge, _, _) if ge.nonEmpty and !p.isStreaming => 
empty(p)
--- End diff --

Can you add to the docs above why we are avoiding this when its streaming.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...

2017-08-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19056#discussion_r135358693
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
 ---
@@ -63,6 +63,11 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] 
extends Logging {
   /** Defines a sequence of rule batches, to be overridden by the 
implementation. */
   protected def batches: Seq[Batch]
 
+  /** Checks invariants that should hold across rule execution. */
--- End diff --

nit: rule executions*s*


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...

2017-08-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19056#discussion_r135358635
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
 ---
@@ -86,6 +91,8 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] 
extends Logging {
 val runTime = System.nanoTime() - startTime
 RuleExecutor.timeMap.addAndGet(rule.ruleName, runTime)
 
+checkInvariants(result, plan, rule)
--- End diff --

Call this only when the plan has changed. So just move this inside the 
condition below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...

2017-08-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19056#discussion_r135358597
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -39,6 +39,15 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
 
   protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations)
 
+  override protected def checkInvariants(
+  result: LogicalPlan,
+  original: LogicalPlan,
+  rule: Rule[LogicalPlan]): Unit = {
+assert(
+  result.isStreaming == original.isStreaming,
+  s"Rule ${rule.ruleName} changed isStreaming from original 
${original.isStreaming}")
--- End diff --

Print the original and result plans as well. So that its easy to debug.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...

2017-08-25 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/19056

[SPARK-21765] Check that optimization doesn't affect isStreaming bit.

## What changes were proposed in this pull request?

Add an assert in logical plan optimization that the isStreaming bit stays 
the same, and fix empty relation rules where that wasn't happening.

## How was this patch tested?

new and existing unit tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark SPARK-21765-followup

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19056.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19056


commit b83349567760dd0d33388d3fc68d8db1b648e1f1
Author: Jose Torres 
Date:   2017-08-25T20:48:49Z

Check that optimization doesn't affect isStreaming bit.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org