Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r168626487
  
    --- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
    @@ -112,14 +112,18 @@ abstract class KafkaSourceTest extends StreamTest 
with SharedSQLContext {
             query.nonEmpty,
             "Cannot add data when there is no query for finding the active 
kafka source")
     
    -      val sources = query.get.logicalPlan.collect {
    -        case StreamingExecutionRelation(source: KafkaSource, _) => source
    -      } ++ (query.get.lastExecution match {
    -        case null => Seq()
    -        case e => e.logical.collect {
    -          case DataSourceV2Relation(_, reader: KafkaContinuousReader) => 
reader
    -        }
    -      })
    +      val sources = {
    +        query.get.logicalPlan.collect {
    +          case StreamingExecutionRelation(source: KafkaSource, _) => source
    +          case StreamingExecutionRelation(source: KafkaMicroBatchReader, 
_) => source
    +        } ++ (query.get.lastExecution match {
    +          case null => Seq()
    +          case e => e.logical.collect {
    +            case DataSourceV2Relation(_, reader: KafkaContinuousReader) => 
reader
    +          }
    +        })
    +      }.distinct
    --- End diff --
    
    yes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to