Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r220275520
  
    --- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
 ---
    @@ -207,13 +207,13 @@ class KafkaContinuousSourceTopicDeletionSuite extends 
KafkaContinuousTest {
             testUtils.createTopic(topic2, partitions = 5)
             eventually(timeout(streamingTimeout)) {
               assert(
    -            query.lastExecution.executedPlan.collectFirst {
    -              case scan: DataSourceV2ScanExec
    -                if 
scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
    -                scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
    -            }.exists { config =>
    +            query.lastExecution.logical.collectFirst {
    --- End diff --
    
    now the known partitions is tracked by the `KafkaContinuousInputStream` in 
logical plan: 
https://github.com/apache/spark/pull/22547/files#diff-5fa6c9fc023183f4a855f778944d23ebR62


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to