Repository: spark Updated Branches: refs/heads/master 381a967a7 -> 810d59ce4
[SPARK-24882][FOLLOWUP] Fix flaky synchronization in Kafka tests. ## What changes were proposed in this pull request? Fix flaky synchronization in Kafka tests - we need to use the scan config that was persisted rather than reconstructing it to identify the stream's current configuration. We caught most instances of this in the original PR, but this one slipped through. ## How was this patch tested? n/a Closes #22245 from jose-torres/fixflake. Authored-by: Jose Torres <torres.joseph.f+git...@gmail.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/810d59ce Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/810d59ce Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/810d59ce Branch: refs/heads/master Commit: 810d59ce44e43f725d1b6d822166c2d97ff49929 Parents: 381a967 Author: Jose Torres <torres.joseph.f+git...@gmail.com> Authored: Mon Aug 27 11:04:39 2018 -0700 Committer: Shixiong Zhu <zsxw...@gmail.com> Committed: Mon Aug 27 11:04:39 2018 -0700 ---------------------------------------------------------------------- .../spark/sql/kafka010/KafkaContinuousSourceSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/810d59ce/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index 3216650..5d68a14 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.kafka010 import org.apache.spark.sql.Dataset -import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec import org.apache.spark.sql.streaming.Trigger // Run tests in KafkaSourceSuiteBase in continuous execution mode. @@ -60,10 +60,10 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { testUtils.createTopic(topic2, partitions = 5) eventually(timeout(streamingTimeout)) { assert( - query.lastExecution.logical.collectFirst { - case r: StreamingDataSourceV2Relation - if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] => - r.scanConfigBuilder.build().asInstanceOf[KafkaContinuousScanConfig] + query.lastExecution.executedPlan.collectFirst { + case scan: DataSourceV2ScanExec + if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] => + scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig] }.exists { config => // Ensure the new topic is present and the old topic is gone. config.knownPartitions.exists(_.topic == topic2) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org