[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...

2017-10-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19467


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...

2017-10-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19467#discussion_r144403807
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 ---
@@ -50,7 +50,8 @@ case class EnsureRequirements(conf: SQLConf) extends 
Rule[SparkPlan] {
   numPartitions: Int): Partitioning = {
 requiredDistribution match {
   case AllTuples => SinglePartition
-  case ClusteredDistribution(clustering) => 
HashPartitioning(clustering, numPartitions)
+  case ClusteredDistribution(clustering, desiredPartitions) =>
--- End diff --

Update scala docs saying that numPartitions param is only if the 
distribution does not specify it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...

2017-10-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19467#discussion_r144434463
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StatefulOperatorTest.scala
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.streaming._
+
+trait StatefulOperatorTest {
+  /**
+   * Check that the output partitioning of a child operator of a Stateful 
operator satisfies the
+   * distribution that we expect for our Stateful operator.
+   */
+  protected def checkChildOutputPartitioning[T <: StatefulOperator](
--- End diff --

since this is only for HashPartitioning, how about 
checkChildOutputHashPartitioning 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...

2017-10-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19467#discussion_r144434390
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StatefulOperatorTest.scala
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.streaming._
+
+trait StatefulOperatorTest {
+  /**
+   * Check that the output partitioning of a child operator of a Stateful 
operator satisfies the
+   * distribution that we expect for our Stateful operator.
+   */
+  protected def checkChildOutputPartitioning[T <: StatefulOperator](
+  sq: StreamingQuery,
+  colNames: Seq[String],
+  numPartitions: Option[Int] = None): Boolean = {
--- End diff --

numPartitions is never used.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...

2017-10-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19467#discussion_r144433715
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala
 ---
@@ -214,7 +214,7 @@ class StateStoreRDDSuite extends SparkFunSuite with 
BeforeAndAfter with BeforeAn
   path: String,
   queryRunId: UUID = UUID.randomUUID,
   version: Int = 0): StatefulOperatorStateInfo = {
-StatefulOperatorStateInfo(path, queryRunId, operatorId = 0, version)
--- End diff --

super nit: numPartitions = 5


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...

2017-10-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19467#discussion_r144402006
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -243,8 +246,9 @@ case class HashPartitioning(expressions: 
Seq[Expression], numPartitions: Int)
 
   override def satisfies(required: Distribution): Boolean = required match 
{
 case UnspecifiedDistribution => true
-case ClusteredDistribution(requiredClustering) =>
-  expressions.forall(x => 
requiredClustering.exists(_.semanticEquals(x)))
+case ClusteredDistribution(requiredClustering, desiredPartitions) =>
+  expressions.forall(x => 
requiredClustering.exists(_.semanticEquals(x))) &&
+desiredPartitions.forall(_ == numPartitions) // if 
desiredPartition = true, returns true
--- End diff --

// if desiredPartition**s** is **None**, return true


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...

2017-10-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19467#discussion_r144408406
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala
 ---
@@ -53,7 +53,7 @@ class EnsureStatefulOpPartitioningSuite extends 
SparkPlanTest with SharedSQLCont
   test("ClusteredDistribution with coalesce(1) generates Exchange with 
HashPartitioning") {
 testEnsureStatefulOpPartitioning(
   baseDf.coalesce(1).queryExecution.sparkPlan,
-  requiredDistribution = keys => ClusteredDistribution(keys),
--- End diff --

This test suite does not make sense as this rule does not exist anymore. So 
if we add tests in the related PlannerSuite to test the new addition in 
EnsureRequirements and Partitioning, then we will only need to test whether 
each stateful operator specifies the numPartitions in its required distribution.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...

2017-10-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19467#discussion_r144406045
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
@@ -43,10 +43,11 @@ case class StatefulOperatorStateInfo(
 checkpointLocation: String,
 queryRunId: UUID,
 operatorId: Long,
-storeVersion: Long) {
+storeVersion: Long,
+numPartitions: Int) {
--- End diff --

nice!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...

2017-10-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19467#discussion_r144406113
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
@@ -239,7 +240,7 @@ case class StateStoreRestoreExec(
 if (keyExpressions.isEmpty) {
   AllTuples :: Nil
 } else {
-  ClusteredDistribution(keyExpressions) :: Nil
+  ClusteredDistribution(keyExpressions, 
stateInfo.map(_.numPartitions)) :: Nil
--- End diff --

need tests for these


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...

2017-10-12 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19467#discussion_r144335223
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
@@ -590,10 +590,33 @@ case class CoalesceExec(numPartitions: Int, child: 
SparkPlan) extends UnaryExecN
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
-child.execute().coalesce(numPartitions, shuffle = false)
+if (numPartitions == 1 && child.execute().getNumPartitions < 1) {
--- End diff --

the existing tests for the original problem should catch it


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...

2017-10-11 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19467#discussion_r144154295
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
@@ -590,10 +590,33 @@ case class CoalesceExec(numPartitions: Int, child: 
SparkPlan) extends UnaryExecN
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
-child.execute().coalesce(numPartitions, shuffle = false)
+if (numPartitions == 1 && child.execute().getNumPartitions < 1) {
--- End diff --

Add a test in DatasetSuite that tests this empty rdd case. maybe in the 
same test as the existing coalesce test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...

2017-10-11 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19467#discussion_r144152923
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 ---
@@ -131,17 +132,17 @@ class IncrementalExecution(
   }
 
   override def preparations: Seq[Rule[SparkPlan]] =
-Seq(state, EnsureStatefulOpPartitioning) ++ super.preparations
+Seq(state, 
EnsureStatefulOpPartitioning(sparkSession.sessionState.conf)) ++ 
super.preparations
 
   /** No need assert supported, as this check has already been done */
   override def assertSupported(): Unit = { }
 }
 
-object EnsureStatefulOpPartitioning extends Rule[SparkPlan] {
+case class EnsureStatefulOpPartitioning(conf: SQLConf) extends 
Rule[SparkPlan] {
   // Needs to be transformUp to avoid extra shuffles
   override def apply(plan: SparkPlan): SparkPlan = plan transformUp {
 case so: StatefulOperator =>
-  val numPartitions = 
plan.sqlContext.sessionState.conf.numShufflePartitions
+  val numPartitions = conf.numShufflePartitions
--- End diff --

Why this change? Doesnt the plan have the same context and conf?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...

2017-10-11 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19467#discussion_r144152859
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 ---
@@ -131,17 +132,17 @@ class IncrementalExecution(
   }
 
   override def preparations: Seq[Rule[SparkPlan]] =
-Seq(state, EnsureStatefulOpPartitioning) ++ super.preparations
+Seq(state, 
EnsureStatefulOpPartitioning(sparkSession.sessionState.conf)) ++ 
super.preparations
 
   /** No need assert supported, as this check has already been done */
   override def assertSupported(): Unit = { }
 }
 
-object EnsureStatefulOpPartitioning extends Rule[SparkPlan] {
+case class EnsureStatefulOpPartitioning(conf: SQLConf) extends 
Rule[SparkPlan] {
   // Needs to be transformUp to avoid extra shuffles
   override def apply(plan: SparkPlan): SparkPlan = plan transformUp {
 case so: StatefulOperator =>
-  val numPartitions = 
plan.sqlContext.sessionState.conf.numShufflePartitions
+  val numPartitions = conf.numShufflePartitions
--- End diff --

why this change?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...

2017-10-11 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19467#discussion_r144152254
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
@@ -590,10 +590,33 @@ case class CoalesceExec(numPartitions: Int, child: 
SparkPlan) extends UnaryExecN
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
-child.execute().coalesce(numPartitions, shuffle = false)
+if (numPartitions == 1 && child.execute().getNumPartitions < 1) {
+  // Make sure we don't output an RDD with 0 partitions, when claiming 
that we have a
+  // `SinglePartition`.
+  new CoalesceExec.EmptyRDDWithPartitions(sparkContext, numPartitions)
+} else {
+  child.execute().coalesce(numPartitions, shuffle = false)
+}
   }
 }
 
+object CoalesceExec {
+  /** A simple RDD with no data, but with the given number of partitions. 
*/
+  class EmptyRDDWithPartitions(
+  @transient private val sc: SparkContext,
+  numPartitions: Int) extends RDD[InternalRow](sc, Nil) {
+
+override def getPartitions: Array[Partition] =
+  Array.tabulate(numPartitions)(i => SimplePartition(i))
+
+override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+  Iterator.empty
+}
+  }
+
+  case class SimplePartition(index: Int) extends Partition
--- End diff --

nit: EmptyPartition?  isnt that more descriptive than "simple"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...

2017-10-10 Thread brkyvz
GitHub user brkyvz opened a pull request:

https://github.com/apache/spark/pull/19467

[SPARK-22238] Fix plan resolution bug caused by EnsureStatefulOpPartitioning

## What changes were proposed in this pull request?

In EnsureStatefulOpPartitioning, we check that the inputRDD to a SparkPlan 
has the expected partitioning for Streaming Stateful Operators. The problem is 
that we are not allowed to access this information during planning.
The reason we added that check was because CoalesceExec could actually 
create RDDs with 0 partitions. We should fix it such that when CoalesceExec 
says that there is a SinglePartition, there is in fact an inputRDD of 1 
partition instead of 0 partitions.

## How was this patch tested?

Regression test in StreamingQuerySuite


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/brkyvz/spark stateful-op

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19467.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19467


commit 961ade120f7a179751e5ec45b24e159259de0bae
Author: Burak Yavuz 
Date:   2017-10-10T22:02:48Z

Fix plan resolution bug caused by EnsureStatefulOpPartitioning




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org