[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19056 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19056#discussion_r137143373 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala --- @@ -128,8 +128,9 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo batches.slice(sliceStart, sliceEnd) } -val rdd = sqlContext.sparkContext.parallelize(rawList).map( -v => InternalRow(UTF8String.fromString(v._1), v._2.getTime())) +val rdd = sqlContext.sparkContext. --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19056#discussion_r137142093 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala --- @@ -128,8 +128,9 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo batches.slice(sliceStart, sliceEnd) } -val rdd = sqlContext.sparkContext.parallelize(rawList).map( -v => InternalRow(UTF8String.fromString(v._1), v._2.getTime())) +val rdd = sqlContext.sparkContext. --- End diff -- we generally put the '.' in the next line. So ``` val rdd = sqlContext.sparkContext .parall ... .map ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19056#discussion_r137048000 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala --- @@ -130,16 +130,7 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo val rdd = sqlContext.sparkContext.parallelize(rawList).map( v => InternalRow(UTF8String.fromString(v._1), v._2.getTime())) --- End diff -- Done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19056#discussion_r136638327 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala --- @@ -130,16 +130,7 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo val rdd = sqlContext.sparkContext.parallelize(rawList).map( v => InternalRow(UTF8String.fromString(v._1), v._2.getTime())) --- End diff -- @joseph-torres this is a nit but a good suggestion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/19056#discussion_r135989439 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala --- @@ -130,16 +130,7 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo val rdd = sqlContext.sparkContext.parallelize(rawList).map( v => InternalRow(UTF8String.fromString(v._1), v._2.getTime())) --- End diff -- May I ask to replace `v` with `case...` as follows? IMHO That would make things easier to read. ``` val rdd = sqlContext.sparkContext. parallelize(rawList). map { case (v, ts) => InternalRow(UTF8String.fromString(v), ts.getTime) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19056#discussion_r135851647 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala --- @@ -126,16 +128,17 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo batches.slice(sliceStart, sliceEnd) } -import sqlContext.implicits._ -val rawBatch = sqlContext.createDataset(rawList) +val rdd = sqlContext.sparkContext.parallelize(rawList).map( +v => InternalRow(UTF8String.fromString(v._1), v._2.getTime())) +val rawBatch = sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true) // Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp // if requested. if (includeTimestamp) { rawBatch.toDF("value", "timestamp") --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19056#discussion_r135851433 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala --- @@ -126,16 +128,17 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo batches.slice(sliceStart, sliceEnd) } -import sqlContext.implicits._ -val rawBatch = sqlContext.createDataset(rawList) +val rdd = sqlContext.sparkContext.parallelize(rawList).map( +v => InternalRow(UTF8String.fromString(v._1), v._2.getTime())) +val rawBatch = sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true) // Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp // if requested. if (includeTimestamp) { rawBatch.toDF("value", "timestamp") } else { // Strip out timestamp - rawBatch.select("_1").toDF("value") + rawBatch.select("value").toDF() --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19056#discussion_r135851225 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -39,6 +39,16 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations) + override protected def checkInvariants( + result: LogicalPlan, + original: LogicalPlan, + rule: Rule[LogicalPlan]): Unit = { +assert( + result.isStreaming == original.isStreaming, + s"Rule ${rule.ruleName} changed isStreaming from original ${original.isStreaming}:" + --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/19056#discussion_r135610992 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala --- @@ -126,16 +128,17 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo batches.slice(sliceStart, sliceEnd) } -import sqlContext.implicits._ -val rawBatch = sqlContext.createDataset(rawList) +val rdd = sqlContext.sparkContext.parallelize(rawList).map( +v => InternalRow(UTF8String.fromString(v._1), v._2.getTime())) +val rawBatch = sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true) // Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp // if requested. if (includeTimestamp) { rawBatch.toDF("value", "timestamp") --- End diff -- I think that the schema will already handle what fields are included in `rawBatch` and this `if` is no longer necessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/19056#discussion_r135610632 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala --- @@ -126,16 +128,17 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo batches.slice(sliceStart, sliceEnd) } -import sqlContext.implicits._ -val rawBatch = sqlContext.createDataset(rawList) +val rdd = sqlContext.sparkContext.parallelize(rawList).map( +v => InternalRow(UTF8String.fromString(v._1), v._2.getTime())) +val rawBatch = sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true) // Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp // if requested. if (includeTimestamp) { rawBatch.toDF("value", "timestamp") } else { // Strip out timestamp - rawBatch.select("_1").toDF("value") + rawBatch.select("value").toDF() --- End diff -- `toDF` is unnecessary since it's already a DataFrame. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/19056#discussion_r135610234 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -39,6 +39,16 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations) + override protected def checkInvariants( + result: LogicalPlan, + original: LogicalPlan, + rule: Rule[LogicalPlan]): Unit = { +assert( + result.isStreaming == original.isStreaming, + s"Rule ${rule.ruleName} changed isStreaming from original ${original.isStreaming}:" + --- End diff -- Space before the closing `"`"? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19056#discussion_r135360845 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala --- @@ -65,11 +66,12 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper { case _: RepartitionByExpression => empty(p) // An aggregate with non-empty group expression will return one output row per group when the // input to the aggregate is not empty. If the input to the aggregate is empty then all groups - // will be empty and thus the output will be empty. + // will be empty and thus the output will be empty. If we're working on batch data, we can + // then treat the aggregate as redundant. // // If the grouping expressions are empty, however, then the aggregate will always produce a // single output row and thus we cannot propagate the EmptyRelation. - case Aggregate(ge, _, _) if ge.nonEmpty => empty(p) + case Aggregate(ge, _, _) if ge.nonEmpty and !p.isStreaming => empty(p) --- End diff -- also make sure that this exception is covered by the tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19056#discussion_r135360650 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala --- @@ -65,11 +66,12 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper { case _: RepartitionByExpression => empty(p) // An aggregate with non-empty group expression will return one output row per group when the // input to the aggregate is not empty. If the input to the aggregate is empty then all groups - // will be empty and thus the output will be empty. + // will be empty and thus the output will be empty. If we're working on batch data, we can + // then treat the aggregate as redundant. // // If the grouping expressions are empty, however, then the aggregate will always produce a // single output row and thus we cannot propagate the EmptyRelation. - case Aggregate(ge, _, _) if ge.nonEmpty => empty(p) + case Aggregate(ge, _, _) if ge.nonEmpty and !p.isStreaming => empty(p) --- End diff -- Can you add to the docs above why we are avoiding this when its streaming. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19056#discussion_r135358693 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala --- @@ -63,6 +63,11 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { /** Defines a sequence of rule batches, to be overridden by the implementation. */ protected def batches: Seq[Batch] + /** Checks invariants that should hold across rule execution. */ --- End diff -- nit: rule executions*s* --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19056#discussion_r135358635 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala --- @@ -86,6 +91,8 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { val runTime = System.nanoTime() - startTime RuleExecutor.timeMap.addAndGet(rule.ruleName, runTime) +checkInvariants(result, plan, rule) --- End diff -- Call this only when the plan has changed. So just move this inside the condition below. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19056#discussion_r135358597 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -39,6 +39,15 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations) + override protected def checkInvariants( + result: LogicalPlan, + original: LogicalPlan, + rule: Rule[LogicalPlan]): Unit = { +assert( + result.isStreaming == original.isStreaming, + s"Rule ${rule.ruleName} changed isStreaming from original ${original.isStreaming}") --- End diff -- Print the original and result plans as well. So that its easy to debug. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/19056 [SPARK-21765] Check that optimization doesn't affect isStreaming bit. ## What changes were proposed in this pull request? Add an assert in logical plan optimization that the isStreaming bit stays the same, and fix empty relation rules where that wasn't happening. ## How was this patch tested? new and existing unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark SPARK-21765-followup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19056.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19056 commit b83349567760dd0d33388d3fc68d8db1b648e1f1 Author: Jose Torres Date: 2017-08-25T20:48:49Z Check that optimization doesn't affect isStreaming bit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org