Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/22547#discussion_r220275520
--- Diff:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
---
@@ -207,13 +207,13 @@ class KafkaContinuousSourceTopicDeletionSuite extends
KafkaContinuousTest {
testUtils.createTopic(topic2, partitions = 5)
eventually(timeout(streamingTimeout)) {
assert(
- query.lastExecution.executedPlan.collectFirst {
- case scan: DataSourceV2ScanExec
- if
scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
- scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
- }.exists { config =>
+ query.lastExecution.logical.collectFirst {
--- End diff --
now the known partitions is tracked by the `KafkaContinuousInputStream` in
logical plan:
https://github.com/apache/spark/pull/22547/files#diff-5fa6c9fc023183f4a855f778944d23ebR62
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]