[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19196


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r139840990
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.UUID
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, 
UnaryExecNode}
+import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchange}
+import org.apache.spark.sql.execution.streaming.{IncrementalExecution, 
OffsetSeqMetadata, StatefulOperator, StatefulOperatorStateInfo}
+import org.apache.spark.sql.test.SharedSQLContext
+
+class EnsureStatefulOpPartitioningSuite extends SparkPlanTest with 
SharedSQLContext {
+
+  import testImplicits._
+  super.beforeAll()
+
+  private val baseDf = Seq((1, "A"), (2, "b")).toDF("num", "char")
+
+  testEnsureStatefulOpPartitioning(
+"ClusteredDistribution generates Exchange with HashPartitioning",
+baseDf.queryExecution.sparkPlan,
+requiredDistribution = keys => ClusteredDistribution(keys),
+expectedPartitioning =
+  keys => HashPartitioning(keys, 
spark.sessionState.conf.numShufflePartitions),
+expectShuffle = true)
+
+  testEnsureStatefulOpPartitioning(
+"ClusteredDistribution with coalesce(1) generates Exchange with 
HashPartitioning",
+baseDf.coalesce(1).queryExecution.sparkPlan,
+requiredDistribution = keys => ClusteredDistribution(keys),
+expectedPartitioning =
+  keys => HashPartitioning(keys, 
spark.sessionState.conf.numShufflePartitions),
+expectShuffle = true)
+
+  testEnsureStatefulOpPartitioning(
+"AllTuples generates Exchange with SinglePartition",
+baseDf.queryExecution.sparkPlan,
+requiredDistribution = _ => AllTuples,
+expectedPartitioning = _ => SinglePartition,
+expectShuffle = true)
+
+  testEnsureStatefulOpPartitioning(
+"AllTuples with coalesce(1) doesn't need Exchange",
+baseDf.coalesce(1).queryExecution.sparkPlan,
+requiredDistribution = _ => AllTuples,
+expectedPartitioning = _ => SinglePartition,
+expectShuffle = false)
+
+  /**
+   * For `StatefulOperator` with the given `requiredChildDistribution`, 
and child SparkPlan
+   * `inputPlan`, ensures that the incremental planner adds exchanges, if 
required, in order to
+   * ensure the expected partitioning.
+   */
+  private def testEnsureStatefulOpPartitioning(
+  testName: String,
+  inputPlan: SparkPlan,
+  requiredDistribution: Seq[Attribute] => Distribution,
+  expectedPartitioning: Seq[Attribute] => Partitioning,
+  expectShuffle: Boolean): Unit = {
+test(testName) {
+  val operator = TestStatefulOperator(inputPlan, 
requiredDistribution(inputPlan.output.take(1)))
+  val executed = executePlan(operator, OutputMode.Complete())
+  if (expectShuffle) {
+val exchange = executed.children.find(_.isInstanceOf[Exchange])
+if (exchange.isEmpty) {
+  fail(s"Was expecting an exchange but didn't get one 
in:\n$executed")
+}
+assert(exchange.get ===
+  ShuffleExchange(expectedPartitioning(inputPlan.output.take(1)), 
inputPlan),
+  s"Exchange didn't have expected properties:\n${exchange.get}")
+  } else {
+assert(!executed.children.exists(_.isInstanceOf[Exchange]),
+  s"Unexpected exchange found in:\n$executed")
+  }
+}
+  }
+
+  /** Executes a SparkPlan using the IncrementalPlanner used for 
Structured 

[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r139578185
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/IncrementalExecutionRulesSuite.scala
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.UUID
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, 
UnaryExecNode}
+import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchange}
+import org.apache.spark.sql.execution.streaming.{IncrementalExecution, 
OffsetSeqMetadata, StatefulOperator, StatefulOperatorStateInfo}
+import org.apache.spark.sql.test.SharedSQLContext
+
+class IncrementalExecutionRulesSuite extends SparkPlanTest with 
SharedSQLContext {
+
+  import testImplicits._
+  super.beforeAll()
+
+  private val baseDf = Seq((1, "A"), (2, "b")).toDF("num", "char")
+
+  testEnsureStatefulOpPartitioning(
+"ClusteredDistribution generates Exchange with HashPartitioning",
+baseDf.queryExecution.sparkPlan,
+keys => ClusteredDistribution(keys),
+keys => HashPartitioning(keys, 
spark.sessionState.conf.numShufflePartitions),
+expectShuffle = true)
+
+  testEnsureStatefulOpPartitioning(
+"ClusteredDistribution with coalesce(1) generates Exchange with 
HashPartitioning",
+baseDf.coalesce(1).queryExecution.sparkPlan,
+keys => ClusteredDistribution(keys),
+keys => HashPartitioning(keys, 
spark.sessionState.conf.numShufflePartitions),
+expectShuffle = true)
+
+  testEnsureStatefulOpPartitioning(
+"AllTuples generates Exchange with SinglePartition",
+baseDf.queryExecution.sparkPlan,
+keys => AllTuples,
+keys => SinglePartition,
+expectShuffle = true)
+
+  testEnsureStatefulOpPartitioning(
+"AllTuples with coalesce(1) doesn't need Exchange",
+baseDf.coalesce(1).queryExecution.sparkPlan,
+keys => AllTuples,
--- End diff --

nit: add `requiredDistribution = ` and `expectedPartitioning = ` for 
greater readability


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r139578096
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/IncrementalExecutionRulesSuite.scala
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.UUID
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, 
UnaryExecNode}
+import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchange}
+import org.apache.spark.sql.execution.streaming.{IncrementalExecution, 
OffsetSeqMetadata, StatefulOperator, StatefulOperatorStateInfo}
+import org.apache.spark.sql.test.SharedSQLContext
+
+class IncrementalExecutionRulesSuite extends SparkPlanTest with 
SharedSQLContext {
+
+  import testImplicits._
+  super.beforeAll()
+
+  private val baseDf = Seq((1, "A"), (2, "b")).toDF("num", "char")
+
+  testEnsureStatefulOpPartitioning(
+"ClusteredDistribution generates Exchange with HashPartitioning",
+baseDf.queryExecution.sparkPlan,
+keys => ClusteredDistribution(keys),
+keys => HashPartitioning(keys, 
spark.sessionState.conf.numShufflePartitions),
+expectShuffle = true)
+
+  testEnsureStatefulOpPartitioning(
+"ClusteredDistribution with coalesce(1) generates Exchange with 
HashPartitioning",
+baseDf.coalesce(1).queryExecution.sparkPlan,
+keys => ClusteredDistribution(keys),
+keys => HashPartitioning(keys, 
spark.sessionState.conf.numShufflePartitions),
+expectShuffle = true)
+
+  testEnsureStatefulOpPartitioning(
+"AllTuples generates Exchange with SinglePartition",
+baseDf.queryExecution.sparkPlan,
+keys => AllTuples,
+keys => SinglePartition,
+expectShuffle = true)
+
+  testEnsureStatefulOpPartitioning(
+"AllTuples with coalesce(1) doesn't need Exchange",
+baseDf.coalesce(1).queryExecution.sparkPlan,
+keys => AllTuples,
+keys => SinglePartition,
+expectShuffle = false)
+
+  private def testEnsureStatefulOpPartitioning(
--- End diff --

nit: add some docs specifying what does it test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r139578010
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/IncrementalExecutionRulesSuite.scala
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.UUID
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, 
UnaryExecNode}
+import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchange}
+import org.apache.spark.sql.execution.streaming.{IncrementalExecution, 
OffsetSeqMetadata, StatefulOperator, StatefulOperatorStateInfo}
+import org.apache.spark.sql.test.SharedSQLContext
+
+class IncrementalExecutionRulesSuite extends SparkPlanTest with 
SharedSQLContext {
+
+  import testImplicits._
+  super.beforeAll()
+
+  private val baseDf = Seq((1, "A"), (2, "b")).toDF("num", "char")
+
+  testEnsureStatefulOpPartitioning(
+"ClusteredDistribution generates Exchange with HashPartitioning",
+baseDf.queryExecution.sparkPlan,
+keys => ClusteredDistribution(keys),
+keys => HashPartitioning(keys, 
spark.sessionState.conf.numShufflePartitions),
+expectShuffle = true)
+
+  testEnsureStatefulOpPartitioning(
+"ClusteredDistribution with coalesce(1) generates Exchange with 
HashPartitioning",
+baseDf.coalesce(1).queryExecution.sparkPlan,
+keys => ClusteredDistribution(keys),
+keys => HashPartitioning(keys, 
spark.sessionState.conf.numShufflePartitions),
+expectShuffle = true)
+
+  testEnsureStatefulOpPartitioning(
+"AllTuples generates Exchange with SinglePartition",
+baseDf.queryExecution.sparkPlan,
+keys => AllTuples,
+keys => SinglePartition,
+expectShuffle = true)
+
+  testEnsureStatefulOpPartitioning(
+"AllTuples with coalesce(1) doesn't need Exchange",
+baseDf.coalesce(1).queryExecution.sparkPlan,
+keys => AllTuples,
+keys => SinglePartition,
+expectShuffle = false)
+
+  private def testEnsureStatefulOpPartitioning(
+  testName: String,
+  inputPlan: SparkPlan,
+  requiredDistribution: Seq[Attribute] => Distribution,
+  expectedPartitioning: Seq[Attribute] => Partitioning,
+  expectShuffle: Boolean): Unit = {
+test("EnsureStatefulOpPartitioning - " + testName) {
+  val operator = TestOperator(inputPlan, 
requiredDistribution(inputPlan.output.take(1)))
+  val executed = executePlan(operator, OutputMode.Complete())
+  if (expectShuffle) {
+val exchange = executed.children.find(_.isInstanceOf[Exchange])
+if (exchange.isEmpty) {
+  fail(s"Was expecting an exchange but didn't get one 
in:\n$executed")
+}
+assert(exchange.get ===
+  ShuffleExchange(expectedPartitioning(inputPlan.output.take(1)), 
inputPlan),
+  s"Exchange didn't have expected properties:\n${exchange.get}")
+  } else {
+assert(!executed.children.exists(_.isInstanceOf[Exchange]),
+  s"Unexpected exchange found in:\n$executed")
+  }
+}
+  }
+
+  private def executePlan(
+  p: SparkPlan,
+  outputMode: OutputMode = OutputMode.Append()): SparkPlan = {
+val execution = new IncrementalExecution(
+  spark,
+  null,
+  OutputMode.Complete(),
+  "chk",
+  UUID.randomUUID(),
+  0L,
+  OffsetSeqMetadata()) {
+  override lazy val sparkPlan: SparkPlan = p transform {
+case plan: SparkPlan =>
+  val inputMap = plan.children.flatMap(_.output).map(a => (a

[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r139577945
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/IncrementalExecutionRulesSuite.scala
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.UUID
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, 
UnaryExecNode}
+import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchange}
+import org.apache.spark.sql.execution.streaming.{IncrementalExecution, 
OffsetSeqMetadata, StatefulOperator, StatefulOperatorStateInfo}
+import org.apache.spark.sql.test.SharedSQLContext
+
+class IncrementalExecutionRulesSuite extends SparkPlanTest with 
SharedSQLContext {
+
+  import testImplicits._
+  super.beforeAll()
+
+  private val baseDf = Seq((1, "A"), (2, "b")).toDF("num", "char")
+
+  testEnsureStatefulOpPartitioning(
+"ClusteredDistribution generates Exchange with HashPartitioning",
+baseDf.queryExecution.sparkPlan,
+keys => ClusteredDistribution(keys),
+keys => HashPartitioning(keys, 
spark.sessionState.conf.numShufflePartitions),
+expectShuffle = true)
+
+  testEnsureStatefulOpPartitioning(
+"ClusteredDistribution with coalesce(1) generates Exchange with 
HashPartitioning",
+baseDf.coalesce(1).queryExecution.sparkPlan,
+keys => ClusteredDistribution(keys),
+keys => HashPartitioning(keys, 
spark.sessionState.conf.numShufflePartitions),
+expectShuffle = true)
+
+  testEnsureStatefulOpPartitioning(
+"AllTuples generates Exchange with SinglePartition",
+baseDf.queryExecution.sparkPlan,
+keys => AllTuples,
+keys => SinglePartition,
+expectShuffle = true)
+
+  testEnsureStatefulOpPartitioning(
+"AllTuples with coalesce(1) doesn't need Exchange",
+baseDf.coalesce(1).queryExecution.sparkPlan,
+keys => AllTuples,
+keys => SinglePartition,
+expectShuffle = false)
+
+  private def testEnsureStatefulOpPartitioning(
+  testName: String,
+  inputPlan: SparkPlan,
+  requiredDistribution: Seq[Attribute] => Distribution,
+  expectedPartitioning: Seq[Attribute] => Partitioning,
+  expectShuffle: Boolean): Unit = {
+test("EnsureStatefulOpPartitioning - " + testName) {
+  val operator = TestOperator(inputPlan, 
requiredDistribution(inputPlan.output.take(1)))
+  val executed = executePlan(operator, OutputMode.Complete())
+  if (expectShuffle) {
+val exchange = executed.children.find(_.isInstanceOf[Exchange])
+if (exchange.isEmpty) {
+  fail(s"Was expecting an exchange but didn't get one 
in:\n$executed")
+}
+assert(exchange.get ===
+  ShuffleExchange(expectedPartitioning(inputPlan.output.take(1)), 
inputPlan),
+  s"Exchange didn't have expected properties:\n${exchange.get}")
+  } else {
+assert(!executed.children.exists(_.isInstanceOf[Exchange]),
+  s"Unexpected exchange found in:\n$executed")
+  }
+}
+  }
+
+  private def executePlan(
+  p: SparkPlan,
+  outputMode: OutputMode = OutputMode.Append()): SparkPlan = {
+val execution = new IncrementalExecution(
+  spark,
+  null,
+  OutputMode.Complete(),
+  "chk",
+  UUID.randomUUID(),
+  0L,
+  OffsetSeqMetadata()) {
+  override lazy val sparkPlan: SparkPlan = p transform {
+case plan: SparkPlan =>
+  val inputMap = plan.children.flatMap(_.output).map(a => (a

[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r139577898
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/IncrementalExecutionRulesSuite.scala
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.UUID
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, 
UnaryExecNode}
+import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchange}
+import org.apache.spark.sql.execution.streaming.{IncrementalExecution, 
OffsetSeqMetadata, StatefulOperator, StatefulOperatorStateInfo}
+import org.apache.spark.sql.test.SharedSQLContext
+
+class IncrementalExecutionRulesSuite extends SparkPlanTest with 
SharedSQLContext {
--- End diff --

How about making this the EnsureStatefulOpPartitioningSuite?
This pattern is followed by many other optimization rules 
(PropagateEmptyRelationSuite, CollapseProjectSuite...)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-15 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r139197660
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ---
@@ -381,4 +388,187 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest
   AddData(streamInput, 0, 1, 2, 3),
   CheckLastBatch((0, 0, 2), (1, 1, 3)))
   }
+
+  /**
+   * This method verifies certain properties in the SparkPlan of a 
streaming aggregation.
+   * First of all, it checks that the child of a `StateStoreRestoreExec` 
creates the desired
+   * data distribution, where the child could be an Exchange, or a 
`HashAggregateExec` which already
+   * provides the expected data distribution.
+   *
+   * The second thing it checks that the child provides the expected 
number of partitions.
+   *
+   * The third thing it checks that we don't add an unnecessary shuffle 
in-between
+   * `StateStoreRestoreExec` and `StateStoreSaveExec`.
+   */
+  private def checkAggregationChain(
+  se: StreamExecution,
+  expectShuffling: Boolean,
+  expectedPartition: Int): Boolean = {
+val executedPlan = se.lastExecution.executedPlan
+val restore = executedPlan
+  .collect { case ss: StateStoreRestoreExec => ss }
+  .head
+restore.child match {
+  case node: UnaryExecNode =>
+assert(node.outputPartitioning.numPartitions === expectedPartition,
+  "Didn't get the expected number of partitions.")
+if (expectShuffling) {
+  assert(node.isInstanceOf[Exchange], s"Expected a shuffle, got: 
${node.child}")
+} else {
+  assert(!node.isInstanceOf[Exchange], "Didn't expect a shuffle")
+}
+
+  case _ => fail("Expected no shuffling")
+}
+var reachedRestore = false
+// Check that there should be no exchanges after 
`StateStoreRestoreExec`
+executedPlan.foreachUp { p =>
+  if (reachedRestore) {
+assert(!p.isInstanceOf[Exchange], "There should be no further 
exchanges")
+  } else {
+reachedRestore = p.isInstanceOf[StateStoreRestoreExec]
+  }
+}
+true
+  }
+
+  /** Add blocks of data to the `BlockRDDBackedSource`. */
+  case class AddBlockData(source: BlockRDDBackedSource, data: Seq[Int]*) 
extends AddData {
+override def addData(query: Option[StreamExecution]): (Source, Offset) 
= {
+  if (data.nonEmpty) {
+data.foreach(source.addData)
+  } else {
+// we would like to create empty blockRDD's so add an empty block 
here.
+source.addData()
+  }
+  source.releaseLock()
+  (source, LongOffset(source.counter))
+}
+  }
+
+  test("SPARK-21977: coalesce(1) with 0 partition RDD should be 
repartitioned to 1") {
+val inputSource = new BlockRDDBackedSource(spark)
+MockSourceProvider.withMockSources(inputSource) {
+  withTempDir { tempDir =>
+val aggregated: Dataset[Long] =
+  spark.readStream
+.format((new MockSourceProvider).getClass.getCanonicalName)
+.load()
+.coalesce(1)
+.groupBy()
+.count()
+.as[Long]
+
+testStream(aggregated, Complete())(
+  AddBlockData(inputSource, Seq(1)),
+  CheckLastBatch(1),
+  AssertOnQuery("Verify no shuffling") { se =>
+checkAggregationChain(se, expectShuffling = false, 1)
+  },
+  AddBlockData(inputSource), // create an empty trigger
+  CheckLastBatch(1),
+  AssertOnQuery("Verify addition of exchange operator") { se =>
+checkAggregationChain(se, expectShuffling = true, 1)
+  },
+  AddBlockData(inputSource, Seq(2, 3)),
+  CheckLastBatch(3),
+  AddBlockData(inputSource),
+  CheckLastBatch(3),
+  StopStream
+)
+  }
+}
+  }
+
+  test("SPARK-21977: coalesce(1) should still be repartitioned when it has 
keyExpressions") {
+val inputSource = new BlockRDDBackedSource(spark)
+MockSourceProvider.withMockSources(inputSource) {
+  withTempDir { tempDir =>
+
+def createDf(partitions: Int): Dataset[(Long, Long)] = {
+  spark.readStream
+.format((new MockSourceProvider).getClass.getCanonicalName)
+.load()
+.coalesce(partitions)
+.groupBy('a % 1) // just to give it a fake key
+.count()
+.as[(Long, Long)]
+}
+
+testStream(createDf(1), Complete())(
+  StartStream(ch

[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r139083835
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ---
@@ -381,4 +388,187 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest
   AddData(streamInput, 0, 1, 2, 3),
   CheckLastBatch((0, 0, 2), (1, 1, 3)))
   }
+
+  /**
+   * This method verifies certain properties in the SparkPlan of a 
streaming aggregation.
+   * First of all, it checks that the child of a `StateStoreRestoreExec` 
creates the desired
+   * data distribution, where the child could be an Exchange, or a 
`HashAggregateExec` which already
+   * provides the expected data distribution.
+   *
+   * The second thing it checks that the child provides the expected 
number of partitions.
+   *
+   * The third thing it checks that we don't add an unnecessary shuffle 
in-between
+   * `StateStoreRestoreExec` and `StateStoreSaveExec`.
+   */
+  private def checkAggregationChain(
+  se: StreamExecution,
+  expectShuffling: Boolean,
+  expectedPartition: Int): Boolean = {
+val executedPlan = se.lastExecution.executedPlan
+val restore = executedPlan
+  .collect { case ss: StateStoreRestoreExec => ss }
+  .head
+restore.child match {
+  case node: UnaryExecNode =>
+assert(node.outputPartitioning.numPartitions === expectedPartition,
+  "Didn't get the expected number of partitions.")
+if (expectShuffling) {
+  assert(node.isInstanceOf[Exchange], s"Expected a shuffle, got: 
${node.child}")
+} else {
+  assert(!node.isInstanceOf[Exchange], "Didn't expect a shuffle")
+}
+
+  case _ => fail("Expected no shuffling")
+}
+var reachedRestore = false
+// Check that there should be no exchanges after 
`StateStoreRestoreExec`
+executedPlan.foreachUp { p =>
+  if (reachedRestore) {
+assert(!p.isInstanceOf[Exchange], "There should be no further 
exchanges")
+  } else {
+reachedRestore = p.isInstanceOf[StateStoreRestoreExec]
+  }
+}
+true
+  }
+
+  /** Add blocks of data to the `BlockRDDBackedSource`. */
+  case class AddBlockData(source: BlockRDDBackedSource, data: Seq[Int]*) 
extends AddData {
+override def addData(query: Option[StreamExecution]): (Source, Offset) 
= {
+  if (data.nonEmpty) {
+data.foreach(source.addData)
+  } else {
+// we would like to create empty blockRDD's so add an empty block 
here.
+source.addData()
+  }
+  source.releaseLock()
+  (source, LongOffset(source.counter))
+}
+  }
+
+  test("SPARK-21977: coalesce(1) with 0 partition RDD should be 
repartitioned to 1") {
+val inputSource = new BlockRDDBackedSource(spark)
+MockSourceProvider.withMockSources(inputSource) {
+  withTempDir { tempDir =>
+val aggregated: Dataset[Long] =
+  spark.readStream
+.format((new MockSourceProvider).getClass.getCanonicalName)
+.load()
+.coalesce(1)
+.groupBy()
+.count()
+.as[Long]
+
+testStream(aggregated, Complete())(
+  AddBlockData(inputSource, Seq(1)),
+  CheckLastBatch(1),
+  AssertOnQuery("Verify no shuffling") { se =>
+checkAggregationChain(se, expectShuffling = false, 1)
+  },
+  AddBlockData(inputSource), // create an empty trigger
+  CheckLastBatch(1),
+  AssertOnQuery("Verify addition of exchange operator") { se =>
+checkAggregationChain(se, expectShuffling = true, 1)
+  },
+  AddBlockData(inputSource, Seq(2, 3)),
+  CheckLastBatch(3),
+  AddBlockData(inputSource),
+  CheckLastBatch(3),
+  StopStream
+)
+  }
+}
+  }
+
+  test("SPARK-21977: coalesce(1) should still be repartitioned when it has 
keyExpressions") {
+val inputSource = new BlockRDDBackedSource(spark)
+MockSourceProvider.withMockSources(inputSource) {
+  withTempDir { tempDir =>
+
+def createDf(partitions: Int): Dataset[(Long, Long)] = {
+  spark.readStream
+.format((new MockSourceProvider).getClass.getCanonicalName)
+.load()
+.coalesce(partitions)
+.groupBy('a % 1) // just to give it a fake key
+.count()
+.as[(Long, Long)]
+}
+
+testStream(createDf(1), Complete())(
+  StartStream(chec

[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r139083752
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ---
@@ -381,4 +388,187 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest
   AddData(streamInput, 0, 1, 2, 3),
   CheckLastBatch((0, 0, 2), (1, 1, 3)))
   }
+
+  /**
+   * This method verifies certain properties in the SparkPlan of a 
streaming aggregation.
+   * First of all, it checks that the child of a `StateStoreRestoreExec` 
creates the desired
+   * data distribution, where the child could be an Exchange, or a 
`HashAggregateExec` which already
+   * provides the expected data distribution.
+   *
+   * The second thing it checks that the child provides the expected 
number of partitions.
+   *
+   * The third thing it checks that we don't add an unnecessary shuffle 
in-between
+   * `StateStoreRestoreExec` and `StateStoreSaveExec`.
+   */
+  private def checkAggregationChain(
+  se: StreamExecution,
+  expectShuffling: Boolean,
+  expectedPartition: Int): Boolean = {
+val executedPlan = se.lastExecution.executedPlan
+val restore = executedPlan
+  .collect { case ss: StateStoreRestoreExec => ss }
+  .head
+restore.child match {
+  case node: UnaryExecNode =>
+assert(node.outputPartitioning.numPartitions === expectedPartition,
+  "Didn't get the expected number of partitions.")
+if (expectShuffling) {
+  assert(node.isInstanceOf[Exchange], s"Expected a shuffle, got: 
${node.child}")
+} else {
+  assert(!node.isInstanceOf[Exchange], "Didn't expect a shuffle")
+}
+
+  case _ => fail("Expected no shuffling")
+}
+var reachedRestore = false
+// Check that there should be no exchanges after 
`StateStoreRestoreExec`
+executedPlan.foreachUp { p =>
+  if (reachedRestore) {
+assert(!p.isInstanceOf[Exchange], "There should be no further 
exchanges")
+  } else {
+reachedRestore = p.isInstanceOf[StateStoreRestoreExec]
+  }
+}
+true
+  }
+
+  /** Add blocks of data to the `BlockRDDBackedSource`. */
+  case class AddBlockData(source: BlockRDDBackedSource, data: Seq[Int]*) 
extends AddData {
+override def addData(query: Option[StreamExecution]): (Source, Offset) 
= {
+  if (data.nonEmpty) {
+data.foreach(source.addData)
+  } else {
+// we would like to create empty blockRDD's so add an empty block 
here.
+source.addData()
+  }
+  source.releaseLock()
+  (source, LongOffset(source.counter))
+}
+  }
+
+  test("SPARK-21977: coalesce(1) with 0 partition RDD should be 
repartitioned to 1") {
--- End diff --

can you add more docs to explain this test. this is testing and complicated 
edge case, so more docs is necessary.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r139079444
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ---
@@ -381,4 +388,187 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest
   AddData(streamInput, 0, 1, 2, 3),
   CheckLastBatch((0, 0, 2), (1, 1, 3)))
   }
+
+  /**
+   * This method verifies certain properties in the SparkPlan of a 
streaming aggregation.
+   * First of all, it checks that the child of a `StateStoreRestoreExec` 
creates the desired
+   * data distribution, where the child could be an Exchange, or a 
`HashAggregateExec` which already
+   * provides the expected data distribution.
+   *
+   * The second thing it checks that the child provides the expected 
number of partitions.
+   *
+   * The third thing it checks that we don't add an unnecessary shuffle 
in-between
+   * `StateStoreRestoreExec` and `StateStoreSaveExec`.
+   */
+  private def checkAggregationChain(
+  se: StreamExecution,
+  expectShuffling: Boolean,
+  expectedPartition: Int): Boolean = {
+val executedPlan = se.lastExecution.executedPlan
+val restore = executedPlan
+  .collect { case ss: StateStoreRestoreExec => ss }
+  .head
+restore.child match {
+  case node: UnaryExecNode =>
+assert(node.outputPartitioning.numPartitions === expectedPartition,
+  "Didn't get the expected number of partitions.")
+if (expectShuffling) {
+  assert(node.isInstanceOf[Exchange], s"Expected a shuffle, got: 
${node.child}")
+} else {
+  assert(!node.isInstanceOf[Exchange], "Didn't expect a shuffle")
+}
+
+  case _ => fail("Expected no shuffling")
+}
+var reachedRestore = false
+// Check that there should be no exchanges after 
`StateStoreRestoreExec`
+executedPlan.foreachUp { p =>
+  if (reachedRestore) {
+assert(!p.isInstanceOf[Exchange], "There should be no further 
exchanges")
+  } else {
+reachedRestore = p.isInstanceOf[StateStoreRestoreExec]
+  }
+}
+true
+  }
+
+  /** Add blocks of data to the `BlockRDDBackedSource`. */
+  case class AddBlockData(source: BlockRDDBackedSource, data: Seq[Int]*) 
extends AddData {
+override def addData(query: Option[StreamExecution]): (Source, Offset) 
= {
+  if (data.nonEmpty) {
+data.foreach(source.addData)
+  } else {
+// we would like to create empty blockRDD's so add an empty block 
here.
+source.addData()
+  }
+  source.releaseLock()
+  (source, LongOffset(source.counter))
+}
+  }
+
+  test("SPARK-21977: coalesce(1) with 0 partition RDD should be 
repartitioned to 1") {
+val inputSource = new BlockRDDBackedSource(spark)
+MockSourceProvider.withMockSources(inputSource) {
+  withTempDir { tempDir =>
+val aggregated: Dataset[Long] =
+  spark.readStream
+.format((new MockSourceProvider).getClass.getCanonicalName)
+.load()
+.coalesce(1)
+.groupBy()
+.count()
+.as[Long]
+
+testStream(aggregated, Complete())(
+  AddBlockData(inputSource, Seq(1)),
+  CheckLastBatch(1),
+  AssertOnQuery("Verify no shuffling") { se =>
+checkAggregationChain(se, expectShuffling = false, 1)
+  },
+  AddBlockData(inputSource), // create an empty trigger
+  CheckLastBatch(1),
+  AssertOnQuery("Verify addition of exchange operator") { se =>
+checkAggregationChain(se, expectShuffling = true, 1)
+  },
+  AddBlockData(inputSource, Seq(2, 3)),
+  CheckLastBatch(3),
+  AddBlockData(inputSource),
+  CheckLastBatch(3),
+  StopStream
+)
+  }
+}
+  }
+
+  test("SPARK-21977: coalesce(1) should still be repartitioned when it has 
keyExpressions") {
--- End diff --

what is `keyExpressions`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r139078992
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ---
@@ -381,4 +388,187 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest
   AddData(streamInput, 0, 1, 2, 3),
   CheckLastBatch((0, 0, 2), (1, 1, 3)))
   }
+
+  /**
+   * This method verifies certain properties in the SparkPlan of a 
streaming aggregation.
+   * First of all, it checks that the child of a `StateStoreRestoreExec` 
creates the desired
+   * data distribution, where the child could be an Exchange, or a 
`HashAggregateExec` which already
+   * provides the expected data distribution.
+   *
+   * The second thing it checks that the child provides the expected 
number of partitions.
+   *
+   * The third thing it checks that we don't add an unnecessary shuffle 
in-between
+   * `StateStoreRestoreExec` and `StateStoreSaveExec`.
+   */
+  private def checkAggregationChain(
+  se: StreamExecution,
+  expectShuffling: Boolean,
+  expectedPartition: Int): Boolean = {
+val executedPlan = se.lastExecution.executedPlan
+val restore = executedPlan
+  .collect { case ss: StateStoreRestoreExec => ss }
+  .head
+restore.child match {
+  case node: UnaryExecNode =>
+assert(node.outputPartitioning.numPartitions === expectedPartition,
+  "Didn't get the expected number of partitions.")
+if (expectShuffling) {
+  assert(node.isInstanceOf[Exchange], s"Expected a shuffle, got: 
${node.child}")
+} else {
+  assert(!node.isInstanceOf[Exchange], "Didn't expect a shuffle")
+}
+
+  case _ => fail("Expected no shuffling")
+}
+var reachedRestore = false
+// Check that there should be no exchanges after 
`StateStoreRestoreExec`
+executedPlan.foreachUp { p =>
+  if (reachedRestore) {
+assert(!p.isInstanceOf[Exchange], "There should be no further 
exchanges")
+  } else {
+reachedRestore = p.isInstanceOf[StateStoreRestoreExec]
+  }
+}
+true
+  }
+
+  /** Add blocks of data to the `BlockRDDBackedSource`. */
+  case class AddBlockData(source: BlockRDDBackedSource, data: Seq[Int]*) 
extends AddData {
+override def addData(query: Option[StreamExecution]): (Source, Offset) 
= {
+  if (data.nonEmpty) {
+data.foreach(source.addData)
+  } else {
+// we would like to create empty blockRDD's so add an empty block 
here.
+source.addData()
+  }
+  source.releaseLock()
+  (source, LongOffset(source.counter))
+}
+  }
+
+  test("SPARK-21977: coalesce(1) with 0 partition RDD should be 
repartitioned to 1") {
+val inputSource = new BlockRDDBackedSource(spark)
+MockSourceProvider.withMockSources(inputSource) {
+  withTempDir { tempDir =>
+val aggregated: Dataset[Long] =
+  spark.readStream
+.format((new MockSourceProvider).getClass.getCanonicalName)
+.load()
+.coalesce(1)
+.groupBy()
+.count()
+.as[Long]
--- End diff --

nit: collapse this query.. and at other places making the tests look 
unnecessarily long.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r139078823
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ---
@@ -381,4 +388,187 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest
   AddData(streamInput, 0, 1, 2, 3),
   CheckLastBatch((0, 0, 2), (1, 1, 3)))
   }
+
+  /**
+   * This method verifies certain properties in the SparkPlan of a 
streaming aggregation.
+   * First of all, it checks that the child of a `StateStoreRestoreExec` 
creates the desired
+   * data distribution, where the child could be an Exchange, or a 
`HashAggregateExec` which already
+   * provides the expected data distribution.
+   *
+   * The second thing it checks that the child provides the expected 
number of partitions.
+   *
+   * The third thing it checks that we don't add an unnecessary shuffle 
in-between
+   * `StateStoreRestoreExec` and `StateStoreSaveExec`.
+   */
+  private def checkAggregationChain(
+  se: StreamExecution,
+  expectShuffling: Boolean,
+  expectedPartition: Int): Boolean = {
+val executedPlan = se.lastExecution.executedPlan
+val restore = executedPlan
+  .collect { case ss: StateStoreRestoreExec => ss }
+  .head
+restore.child match {
+  case node: UnaryExecNode =>
+assert(node.outputPartitioning.numPartitions === expectedPartition,
+  "Didn't get the expected number of partitions.")
+if (expectShuffling) {
+  assert(node.isInstanceOf[Exchange], s"Expected a shuffle, got: 
${node.child}")
+} else {
+  assert(!node.isInstanceOf[Exchange], "Didn't expect a shuffle")
+}
+
+  case _ => fail("Expected no shuffling")
+}
+var reachedRestore = false
+// Check that there should be no exchanges after 
`StateStoreRestoreExec`
+executedPlan.foreachUp { p =>
+  if (reachedRestore) {
+assert(!p.isInstanceOf[Exchange], "There should be no further 
exchanges")
+  } else {
+reachedRestore = p.isInstanceOf[StateStoreRestoreExec]
+  }
+}
+true
+  }
+
+  /** Add blocks of data to the `BlockRDDBackedSource`. */
+  case class AddBlockData(source: BlockRDDBackedSource, data: Seq[Int]*) 
extends AddData {
+override def addData(query: Option[StreamExecution]): (Source, Offset) 
= {
+  if (data.nonEmpty) {
+data.foreach(source.addData)
+  } else {
+// we would like to create empty blockRDD's so add an empty block 
here.
+source.addData()
+  }
+  source.releaseLock()
+  (source, LongOffset(source.counter))
+}
+  }
+
+  test("SPARK-21977: coalesce(1) with 0 partition RDD should be 
repartitioned to 1") {
+val inputSource = new BlockRDDBackedSource(spark)
+MockSourceProvider.withMockSources(inputSource) {
+  withTempDir { tempDir =>
+val aggregated: Dataset[Long] =
+  spark.readStream
+.format((new MockSourceProvider).getClass.getCanonicalName)
+.load()
+.coalesce(1)
+.groupBy()
+.count()
+.as[Long]
+
+testStream(aggregated, Complete())(
+  AddBlockData(inputSource, Seq(1)),
+  CheckLastBatch(1),
+  AssertOnQuery("Verify no shuffling") { se =>
+checkAggregationChain(se, expectShuffling = false, 1)
+  },
+  AddBlockData(inputSource), // create an empty trigger
+  CheckLastBatch(1),
+  AssertOnQuery("Verify addition of exchange operator") { se =>
+checkAggregationChain(se, expectShuffling = true, 1)
+  },
+  AddBlockData(inputSource, Seq(2, 3)),
+  CheckLastBatch(3),
+  AddBlockData(inputSource),
+  CheckLastBatch(3),
+  StopStream
+)
+  }
+}
+  }
+
+  test("SPARK-21977: coalesce(1) should still be repartitioned when it has 
keyExpressions") {
+val inputSource = new BlockRDDBackedSource(spark)
+MockSourceProvider.withMockSources(inputSource) {
+  withTempDir { tempDir =>
+
+def createDf(partitions: Int): Dataset[(Long, Long)] = {
+  spark.readStream
+.format((new MockSourceProvider).getClass.getCanonicalName)
+.load()
+.coalesce(partitions)
+.groupBy('a % 1) // just to give it a fake key
+.count()
+.as[(Long, Long)]
+}
+
+testStream(createDf(1), Complete())(
+  StartStream(chec

[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-14 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r138988133
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
@@ -200,18 +202,31 @@ case class StateStoreRestoreExec(
   sqlContext.sessionState,
   Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) 
=>
 val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
-iter.flatMap { row =>
-  val key = getKey(row)
-  val savedState = store.get(key)
-  numOutputRows += 1
-  row +: Option(savedState).toSeq
+val hasInput = iter.hasNext
+if (!hasInput && keyExpressions.isEmpty) {
--- End diff --

there wasn't any docs in batch :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r138759679
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 ---
@@ -117,8 +119,33 @@ class IncrementalExecution(
 }
   }
 
-  override def preparations: Seq[Rule[SparkPlan]] = state +: 
super.preparations
+  override def preparations: Seq[Rule[SparkPlan]] = Seq(
+state,
+EnsureStatefulOpPartitioning) ++ super.preparations
 
   /** No need assert supported, as this check has already been done */
   override def assertSupported(): Unit = { }
 }
+
+object EnsureStatefulOpPartitioning extends Rule[SparkPlan] {
+  // Needs to be transformUp to avoid extra shuffles
+  override def apply(plan: SparkPlan): SparkPlan = plan transformUp {
+case ss: StatefulOperator =>
--- End diff --

nit: why `ss`? how about `so` or `op` or `stateOp`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r138761881
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ---
@@ -381,4 +388,233 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest
   AddData(streamInput, 0, 1, 2, 3),
   CheckLastBatch((0, 0, 2), (1, 1, 3)))
   }
+
+  private def checkAggregationChain(
+  sq: StreamingQuery,
+  requiresShuffling: Boolean,
+  expectedPartition: Int): Unit = {
+val executedPlan = 
sq.asInstanceOf[StreamingQueryWrapper].streamingQuery
+  .lastExecution.executedPlan
+val restore = executedPlan
+  .collect { case ss: StateStoreRestoreExec => ss }
+  .head
+restore.child match {
+  case node: UnaryExecNode =>
+assert(node.outputPartitioning.numPartitions === expectedPartition)
+if (requiresShuffling) {
+  assert(node.isInstanceOf[Exchange], s"Expected a shuffle, got: 
${node.child}")
+} else {
+  assert(!node.isInstanceOf[Exchange], "Didn't expect a shuffle")
+}
+
+  case _ => fail("Expected no shuffling")
+}
+var reachedRestore = false
+// Check that there should be no exchanges after 
`StateStoreRestoreExec`
+executedPlan.foreachUp { p =>
+  if (reachedRestore) {
+assert(!p.isInstanceOf[Exchange], "There should be no further 
exchanges")
+  } else {
+reachedRestore = p.isInstanceOf[StateStoreRestoreExec]
+  }
+}
+  }
+
+  test("SPARK-21977: coalesce(1) with 0 partition RDD should be 
repartitioned accordingly") {
+val inputSource = new NonLocalRelationSource(spark)
+MockSourceProvider.withMockSources(inputSource) {
+  withTempDir { tempDir =>
+val aggregated: Dataset[Long] =
+  spark.readStream
+.format((new MockSourceProvider).getClass.getCanonicalName)
+.load()
+.coalesce(1)
+.groupBy()
+.count()
+.as[Long]
+
+val sq = aggregated.writeStream
+  .format("memory")
+  .outputMode("complete")
+  .queryName("agg_test")
+  .option("checkpointLocation", tempDir.getAbsolutePath)
+  .start()
+
+try {
+
+  inputSource.addData(1)
+  inputSource.releaseLock()
+  sq.processAllAvailable()
+
+  checkDataset(
+spark.table("agg_test").as[Long],
+1L)
+
+  checkAggregationChain(sq, requiresShuffling = false, 1)
+
+  inputSource.addData()
+  inputSource.releaseLock()
+  sq.processAllAvailable()
+
+  checkAggregationChain(sq, requiresShuffling = true, 1)
+
+  checkDataset(
+spark.table("agg_test").as[Long],
+1L)
+
+  inputSource.addData(2, 3)
+  inputSource.releaseLock()
+  sq.processAllAvailable()
+
+  checkDataset(
+spark.table("agg_test").as[Long],
+3L)
+
+  inputSource.addData()
+  inputSource.releaseLock()
+  sq.processAllAvailable()
+
+  checkDataset(
+spark.table("agg_test").as[Long],
+3L)
+} finally {
+  sq.stop()
+}
+  }
+}
+  }
+
+  test("SPARK-21977: coalesce(1) should still be repartitioned when it has 
keyExpressions") {
+val inputSource = new NonLocalRelationSource(spark)
+MockSourceProvider.withMockSources(inputSource) {
+  withTempDir { tempDir =>
+
+val sq = spark.readStream
+  .format((new MockSourceProvider).getClass.getCanonicalName)
+  .load()
+  .coalesce(1)
+  .groupBy('a % 1) // just to give it a fake key
+  .count()
+  .as[(Long, Long)]
+  .writeStream
+  .format("memory")
+  .outputMode("complete")
+  .queryName("agg_test")
+  .option("checkpointLocation", tempDir.getAbsolutePath)
+  .start()
+
+try {
+
+  inputSource.addData(1)
+  inputSource.releaseLock()
+  sq.processAllAvailable()
+
+  checkAggregationChain(
+sq,
+requiresShuffling = true,
+spark.sessionState.conf.numShufflePartitions)
+
+  checkDataset(
+spark.table("agg_test").as[(Long, Long)],
+(0L, 1L))
+
+} finally {
+  sq.stop()
+   

[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r138762394
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ---
@@ -381,4 +388,233 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest
   AddData(streamInput, 0, 1, 2, 3),
   CheckLastBatch((0, 0, 2), (1, 1, 3)))
   }
+
+  private def checkAggregationChain(
+  sq: StreamingQuery,
+  requiresShuffling: Boolean,
+  expectedPartition: Int): Unit = {
+val executedPlan = 
sq.asInstanceOf[StreamingQueryWrapper].streamingQuery
+  .lastExecution.executedPlan
+val restore = executedPlan
+  .collect { case ss: StateStoreRestoreExec => ss }
+  .head
+restore.child match {
+  case node: UnaryExecNode =>
+assert(node.outputPartitioning.numPartitions === expectedPartition)
+if (requiresShuffling) {
+  assert(node.isInstanceOf[Exchange], s"Expected a shuffle, got: 
${node.child}")
+} else {
+  assert(!node.isInstanceOf[Exchange], "Didn't expect a shuffle")
+}
+
+  case _ => fail("Expected no shuffling")
+}
+var reachedRestore = false
+// Check that there should be no exchanges after 
`StateStoreRestoreExec`
+executedPlan.foreachUp { p =>
+  if (reachedRestore) {
+assert(!p.isInstanceOf[Exchange], "There should be no further 
exchanges")
+  } else {
+reachedRestore = p.isInstanceOf[StateStoreRestoreExec]
+  }
+}
+  }
+
+  test("SPARK-21977: coalesce(1) with 0 partition RDD should be 
repartitioned accordingly") {
+val inputSource = new NonLocalRelationSource(spark)
+MockSourceProvider.withMockSources(inputSource) {
+  withTempDir { tempDir =>
+val aggregated: Dataset[Long] =
+  spark.readStream
+.format((new MockSourceProvider).getClass.getCanonicalName)
+.load()
+.coalesce(1)
+.groupBy()
+.count()
+.as[Long]
+
+val sq = aggregated.writeStream
+  .format("memory")
+  .outputMode("complete")
+  .queryName("agg_test")
+  .option("checkpointLocation", tempDir.getAbsolutePath)
+  .start()
+
+try {
+
+  inputSource.addData(1)
+  inputSource.releaseLock()
+  sq.processAllAvailable()
+
+  checkDataset(
+spark.table("agg_test").as[Long],
+1L)
+
+  checkAggregationChain(sq, requiresShuffling = false, 1)
+
+  inputSource.addData()
+  inputSource.releaseLock()
+  sq.processAllAvailable()
+
+  checkAggregationChain(sq, requiresShuffling = true, 1)
+
+  checkDataset(
+spark.table("agg_test").as[Long],
+1L)
+
+  inputSource.addData(2, 3)
+  inputSource.releaseLock()
+  sq.processAllAvailable()
+
+  checkDataset(
+spark.table("agg_test").as[Long],
+3L)
+
+  inputSource.addData()
+  inputSource.releaseLock()
+  sq.processAllAvailable()
+
+  checkDataset(
+spark.table("agg_test").as[Long],
+3L)
+} finally {
+  sq.stop()
+}
+  }
+}
+  }
+
+  test("SPARK-21977: coalesce(1) should still be repartitioned when it has 
keyExpressions") {
+val inputSource = new NonLocalRelationSource(spark)
+MockSourceProvider.withMockSources(inputSource) {
+  withTempDir { tempDir =>
+
+val sq = spark.readStream
+  .format((new MockSourceProvider).getClass.getCanonicalName)
+  .load()
+  .coalesce(1)
+  .groupBy('a % 1) // just to give it a fake key
+  .count()
+  .as[(Long, Long)]
+  .writeStream
+  .format("memory")
+  .outputMode("complete")
+  .queryName("agg_test")
+  .option("checkpointLocation", tempDir.getAbsolutePath)
+  .start()
+
+try {
+
+  inputSource.addData(1)
+  inputSource.releaseLock()
+  sq.processAllAvailable()
+
+  checkAggregationChain(
+sq,
+requiresShuffling = true,
+spark.sessionState.conf.numShufflePartitions)
+
+  checkDataset(
+spark.table("agg_test").as[(Long, Long)],
+(0L, 1L))
+
+} finally {
+  sq.stop()
+   

[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r138760053
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -800,6 +800,7 @@ class StreamExecution(
 if (streamDeathCause != null) {
   throw streamDeathCause
 }
+if (!isActive) return
--- End diff --

+1 good catch


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r138765597
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ---
@@ -381,4 +388,233 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest
   AddData(streamInput, 0, 1, 2, 3),
   CheckLastBatch((0, 0, 2), (1, 1, 3)))
   }
+
+  private def checkAggregationChain(
+  sq: StreamingQuery,
+  requiresShuffling: Boolean,
+  expectedPartition: Int): Unit = {
+val executedPlan = 
sq.asInstanceOf[StreamingQueryWrapper].streamingQuery
+  .lastExecution.executedPlan
+val restore = executedPlan
+  .collect { case ss: StateStoreRestoreExec => ss }
+  .head
+restore.child match {
+  case node: UnaryExecNode =>
+assert(node.outputPartitioning.numPartitions === expectedPartition)
+if (requiresShuffling) {
+  assert(node.isInstanceOf[Exchange], s"Expected a shuffle, got: 
${node.child}")
+} else {
+  assert(!node.isInstanceOf[Exchange], "Didn't expect a shuffle")
+}
+
+  case _ => fail("Expected no shuffling")
+}
+var reachedRestore = false
+// Check that there should be no exchanges after 
`StateStoreRestoreExec`
+executedPlan.foreachUp { p =>
+  if (reachedRestore) {
+assert(!p.isInstanceOf[Exchange], "There should be no further 
exchanges")
+  } else {
+reachedRestore = p.isInstanceOf[StateStoreRestoreExec]
+  }
+}
+  }
+
+  test("SPARK-21977: coalesce(1) with 0 partition RDD should be 
repartitioned accordingly") {
+val inputSource = new NonLocalRelationSource(spark)
+MockSourceProvider.withMockSources(inputSource) {
+  withTempDir { tempDir =>
+val aggregated: Dataset[Long] =
+  spark.readStream
+.format((new MockSourceProvider).getClass.getCanonicalName)
+.load()
+.coalesce(1)
+.groupBy()
+.count()
+.as[Long]
+
+val sq = aggregated.writeStream
+  .format("memory")
+  .outputMode("complete")
+  .queryName("agg_test")
+  .option("checkpointLocation", tempDir.getAbsolutePath)
+  .start()
+
+try {
+
+  inputSource.addData(1)
+  inputSource.releaseLock()
+  sq.processAllAvailable()
+
+  checkDataset(
+spark.table("agg_test").as[Long],
+1L)
+
+  checkAggregationChain(sq, requiresShuffling = false, 1)
+
+  inputSource.addData()
+  inputSource.releaseLock()
+  sq.processAllAvailable()
+
+  checkAggregationChain(sq, requiresShuffling = true, 1)
+
+  checkDataset(
+spark.table("agg_test").as[Long],
+1L)
+
+  inputSource.addData(2, 3)
--- End diff --

This is a lot of duplicate code. I am sure you can create shortcuts like 
AddData, and AddFileData for this source, and then you can use `testStream()`. 
All the checkAggregation can be put inside an AssertQuery.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r138771978
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ---
@@ -381,4 +388,233 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest
   AddData(streamInput, 0, 1, 2, 3),
   CheckLastBatch((0, 0, 2), (1, 1, 3)))
   }
+
+  private def checkAggregationChain(
+  sq: StreamingQuery,
+  requiresShuffling: Boolean,
+  expectedPartition: Int): Unit = {
+val executedPlan = 
sq.asInstanceOf[StreamingQueryWrapper].streamingQuery
+  .lastExecution.executedPlan
+val restore = executedPlan
+  .collect { case ss: StateStoreRestoreExec => ss }
+  .head
+restore.child match {
+  case node: UnaryExecNode =>
+assert(node.outputPartitioning.numPartitions === expectedPartition)
+if (requiresShuffling) {
+  assert(node.isInstanceOf[Exchange], s"Expected a shuffle, got: 
${node.child}")
+} else {
+  assert(!node.isInstanceOf[Exchange], "Didn't expect a shuffle")
+}
+
+  case _ => fail("Expected no shuffling")
+}
+var reachedRestore = false
+// Check that there should be no exchanges after 
`StateStoreRestoreExec`
+executedPlan.foreachUp { p =>
+  if (reachedRestore) {
+assert(!p.isInstanceOf[Exchange], "There should be no further 
exchanges")
+  } else {
+reachedRestore = p.isInstanceOf[StateStoreRestoreExec]
+  }
+}
+  }
+
+  test("SPARK-21977: coalesce(1) with 0 partition RDD should be 
repartitioned accordingly") {
+val inputSource = new NonLocalRelationSource(spark)
+MockSourceProvider.withMockSources(inputSource) {
+  withTempDir { tempDir =>
+val aggregated: Dataset[Long] =
+  spark.readStream
+.format((new MockSourceProvider).getClass.getCanonicalName)
+.load()
+.coalesce(1)
+.groupBy()
+.count()
+.as[Long]
+
+val sq = aggregated.writeStream
+  .format("memory")
+  .outputMode("complete")
+  .queryName("agg_test")
+  .option("checkpointLocation", tempDir.getAbsolutePath)
+  .start()
+
+try {
+
+  inputSource.addData(1)
+  inputSource.releaseLock()
+  sq.processAllAvailable()
+
+  checkDataset(
+spark.table("agg_test").as[Long],
+1L)
+
+  checkAggregationChain(sq, requiresShuffling = false, 1)
+
+  inputSource.addData()
+  inputSource.releaseLock()
+  sq.processAllAvailable()
+
+  checkAggregationChain(sq, requiresShuffling = true, 1)
+
+  checkDataset(
+spark.table("agg_test").as[Long],
+1L)
+
+  inputSource.addData(2, 3)
+  inputSource.releaseLock()
+  sq.processAllAvailable()
+
+  checkDataset(
+spark.table("agg_test").as[Long],
+3L)
+
+  inputSource.addData()
+  inputSource.releaseLock()
+  sq.processAllAvailable()
+
+  checkDataset(
+spark.table("agg_test").as[Long],
+3L)
+} finally {
+  sq.stop()
+}
+  }
+}
+  }
+
+  test("SPARK-21977: coalesce(1) should still be repartitioned when it has 
keyExpressions") {
+val inputSource = new NonLocalRelationSource(spark)
+MockSourceProvider.withMockSources(inputSource) {
+  withTempDir { tempDir =>
+
+val sq = spark.readStream
+  .format((new MockSourceProvider).getClass.getCanonicalName)
+  .load()
+  .coalesce(1)
+  .groupBy('a % 1) // just to give it a fake key
+  .count()
+  .as[(Long, Long)]
+  .writeStream
+  .format("memory")
+  .outputMode("complete")
+  .queryName("agg_test")
+  .option("checkpointLocation", tempDir.getAbsolutePath)
+  .start()
+
+try {
+
+  inputSource.addData(1)
+  inputSource.releaseLock()
+  sq.processAllAvailable()
+
+  checkAggregationChain(
+sq,
+requiresShuffling = true,
+spark.sessionState.conf.numShufflePartitions)
+
+  checkDataset(
+spark.table("agg_test").as[(Long, Long)],
+(0L, 1L))
+
+} finally {
+  sq.stop()
+   

[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r138771941
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ---
@@ -381,4 +388,233 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest
   AddData(streamInput, 0, 1, 2, 3),
   CheckLastBatch((0, 0, 2), (1, 1, 3)))
   }
+
+  private def checkAggregationChain(
+  sq: StreamingQuery,
+  requiresShuffling: Boolean,
+  expectedPartition: Int): Unit = {
+val executedPlan = 
sq.asInstanceOf[StreamingQueryWrapper].streamingQuery
+  .lastExecution.executedPlan
+val restore = executedPlan
+  .collect { case ss: StateStoreRestoreExec => ss }
+  .head
+restore.child match {
+  case node: UnaryExecNode =>
+assert(node.outputPartitioning.numPartitions === expectedPartition)
+if (requiresShuffling) {
+  assert(node.isInstanceOf[Exchange], s"Expected a shuffle, got: 
${node.child}")
+} else {
+  assert(!node.isInstanceOf[Exchange], "Didn't expect a shuffle")
+}
+
+  case _ => fail("Expected no shuffling")
+}
+var reachedRestore = false
+// Check that there should be no exchanges after 
`StateStoreRestoreExec`
+executedPlan.foreachUp { p =>
+  if (reachedRestore) {
+assert(!p.isInstanceOf[Exchange], "There should be no further 
exchanges")
+  } else {
+reachedRestore = p.isInstanceOf[StateStoreRestoreExec]
+  }
+}
+  }
+
+  test("SPARK-21977: coalesce(1) with 0 partition RDD should be 
repartitioned accordingly") {
--- End diff --

what does "accordingly" mean? this test name can be improved.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r138760476
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
@@ -200,18 +202,31 @@ case class StateStoreRestoreExec(
   sqlContext.sessionState,
   Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) 
=>
 val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
-iter.flatMap { row =>
-  val key = getKey(row)
-  val savedState = store.get(key)
-  numOutputRows += 1
-  row +: Option(savedState).toSeq
+val hasInput = iter.hasNext
+if (!hasInput && keyExpressions.isEmpty) {
--- End diff --

add docs on why we are doing this. similar to the docs in other places 
related to batch aggregation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r138762847
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ---
@@ -381,4 +388,233 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest
   AddData(streamInput, 0, 1, 2, 3),
   CheckLastBatch((0, 0, 2), (1, 1, 3)))
   }
+
+  private def checkAggregationChain(
--- End diff --

what does it check about the aggregation chain? add docs for any such 
complex functions


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r138772306
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
 ---
@@ -873,6 +875,96 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
 assert(e.getMessage === "The output mode of function should be append 
or update")
   }
 
+  test("SPARK-21977: coalesce(1) should still be repartitioned when it has 
keyExpressions") {
--- End diff --

what changed code paths this test cover that is not already covered by the 
other tests you added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r138771336
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 ---
@@ -117,8 +119,33 @@ class IncrementalExecution(
 }
   }
 
-  override def preparations: Seq[Rule[SparkPlan]] = state +: 
super.preparations
+  override def preparations: Seq[Rule[SparkPlan]] = Seq(
+state,
+EnsureStatefulOpPartitioning) ++ super.preparations
 
   /** No need assert supported, as this check has already been done */
   override def assertSupported(): Unit = { }
 }
+
+object EnsureStatefulOpPartitioning extends Rule[SparkPlan] {
+  // Needs to be transformUp to avoid extra shuffles
+  override def apply(plan: SparkPlan): SparkPlan = plan transformUp {
+case ss: StatefulOperator =>
+  val numPartitions = 
plan.sqlContext.sessionState.conf.numShufflePartitions
+  val keys = ss.keyExpressions
--- End diff --

I think that is a better idea. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r138756939
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 ---
@@ -117,8 +119,33 @@ class IncrementalExecution(
 }
   }
 
-  override def preparations: Seq[Rule[SparkPlan]] = state +: 
super.preparations
+  override def preparations: Seq[Rule[SparkPlan]] = Seq(
--- End diff --

this is a odd break up of the line. how about 
```
override def preparations: Seq[Rule[SparkPlan]] =
   Seq(state, EnsureStatefulOpPartitioning) ++ super.preparations
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-12 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r138414658
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
@@ -53,6 +53,8 @@ case class StatefulOperatorStateInfo(
 trait StatefulOperator extends SparkPlan {
   def stateInfo: Option[StatefulOperatorStateInfo]
 
+  def keyExpressions: Seq[Attribute]
--- End diff --

we don't need to expose this if we don't want to


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-12 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r138414592
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 ---
@@ -117,8 +119,33 @@ class IncrementalExecution(
 }
   }
 
-  override def preparations: Seq[Rule[SparkPlan]] = state +: 
super.preparations
+  override def preparations: Seq[Rule[SparkPlan]] = Seq(
+state,
+EnsureStatefulOpPartitioning) ++ super.preparations
 
   /** No need assert supported, as this check has already been done */
   override def assertSupported(): Unit = { }
 }
+
+object EnsureStatefulOpPartitioning extends Rule[SparkPlan] {
+  // Needs to be transformUp to avoid extra shuffles
+  override def apply(plan: SparkPlan): SparkPlan = plan transformUp {
+case ss: StatefulOperator =>
+  val numPartitions = 
plan.sqlContext.sessionState.conf.numShufflePartitions
+  val keys = ss.keyExpressions
--- End diff --

Another option is to not expose `keyExpressions` in `StatefulOperator` but 
use the `requiredChildDistribution` field to get the required key expression 
and partitioning


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-11 Thread brkyvz
GitHub user brkyvz opened a pull request:

https://github.com/apache/spark/pull/19196

[SPARK-21977] SinglePartition optimizations break certain Streaming 
Stateful Aggregation requirements

## What changes were proposed in this pull request?

This is a bit hard to explain as there are several issues here, I'll try my 
best. Here are the requirements:
1. A StructuredStreaming Source that can generate empty RDDs with 0 
partitions
2. A StructuredStreaming query that uses the above source, performs a 
stateful aggregation (mapGroupsWithState, groupBy.count, ...), and coalesce's 
by 1
The crux of the problem is that when a dataset has a `coalesce(1)` call, it 
receives a `SinglePartition` partitioning scheme. This scheme satisfies most 
required distributions used for aggregations such as HashAggregateExec. This 
causes a world of problems:
Symptom 1. If the input RDD has 0 partitions, the whole lineage will 
receive 0 partitions, nothing will be executed, the state store will not create 
any delta files. When this happens, the next trigger fails, because the 
StateStore fails to load the delta file for the previous trigger
Symptom 2. Let's say that there was data. Then in this case, if you stop 
your stream, and change `coalesce(1)` with `coalesce(2)`, then restart your 
stream, your stream will fail, because `spark.sql.shuffle.partitions - 1` 
number of StateStores will fail to find its delta files.
To fix the issues above, we must check that the partitioning of the child 
of a `StatefulOperator` satisfies:
If the grouping expressions are empty:
a) AllTuple distribution
b) Single physical partition
If the grouping expressions are non empty:
a) Clustered distribution
b) spark.sql.shuffle.partition # of partitions
whether or not coalesce(1) exists in the plan, and whether or not the input 
RDD for the trigger has any data.
Once you fix the above problem by adding an Exchange to the plan, you come 
across the following bug:
If you call `coalesce(1).groupBy().count()` on a Streaming DataFrame, and 
if you have a trigger with no data, `StateStoreRestoreExec` doesn't return the 
prior state. However, for this specific aggregation, `HashAggregateExec` after 
the restore returns a (0, 0) row, since we're performing a count, and there is 
no data. Then this data gets stored in `StateStoreSaveExec` causing the 
previous counts to be overwritten and lost.

## How was this patch tested?

Regression tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/brkyvz/spark sa-0

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19196.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19196


commit b7aeed6af2aaf6eb347dd0a492a62e6530900eb5
Author: Burak Yavuz 
Date:   2017-09-08T18:36:02Z

couldn't repro

commit 4a7d1240196cc4660d33aef33d893526da5f0ceb
Author: Burak Yavuz 
Date:   2017-09-11T17:44:15Z

save

commit 00fa5923c7663f58df72937626bfadac5dc2f1fd
Author: Burak Yavuz 
Date:   2017-09-12T04:32:30Z

ready for review

commit 090044ca089870befff464d37f098c4d4fd19657
Author: Burak Yavuz 
Date:   2017-09-12T04:33:05Z

uncomment




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org