Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20941#discussion_r178208005
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---
    @@ -444,6 +445,26 @@ trait StreamTest extends QueryTest with 
SharedSQLContext with TimeLimits with Be
             }
           }
     
    +      if (currentStream.isInstanceOf[MicroBatchExecution]) {
    +        // Verify if stateful operators have correct metadata and 
distribution
    +        // This can often catch hard to debug errors when developing 
stateful operators
    +        val executedPlan = currentStream.lastExecution.executedPlan
    +        executedPlan.collect { case s: StatefulOperator => s }.foreach { s 
=>
    +          assert(s.stateInfo.isDefined)
    +          assert(s.stateInfo.get.numPartitions >= 1)
    +
    +          s.requiredChildDistribution.foreach { d =>
    +            withClue(s"$s specifies incorrect # partitions in 
requiredChildDistribution $d") {
    +              assert(d.requiredNumPartitions.isDefined)
    +              assert(d.requiredNumPartitions.get >= 1)
    +              if (d != AllTuples) {
    +                assert(d.requiredNumPartitions.get == 
s.stateInfo.get.numPartitions)
    --- End diff --
    
    can you also verify that this is equal to the number of partitions in the 
metadata?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to