Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r220274862
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchInputStream.scala
 ---
    @@ -294,6 +227,88 @@ private[kafka010] class KafkaMicroBatchReadSupport(
       }
     }
     
    +private[kafka010] class KafkaMicroBatchScan(
    +    kafkaOffsetReader: KafkaOffsetReader,
    +    rangeCalculator: KafkaOffsetRangeCalculator,
    +    executorKafkaParams: ju.Map[String, Object],
    +    pollTimeoutMs: Long,
    +    failOnDataLoss: Boolean,
    +    reportDataLoss: String => Unit,
    +    start: KafkaSourceOffset,
    +    end: KafkaSourceOffset) extends MicroBatchScan with Logging {
    +
    +  override def createReaderFactory(): PartitionReaderFactory = {
    +    KafkaMicroBatchReaderFactory
    +  }
    +
    +  override def planInputPartitions(): Array[InputPartition] = {
    +    val startPartitionOffsets = start.partitionToOffsets
    --- End diff --
    
    moved from 
https://github.com/apache/spark/pull/22547/files#diff-314d02b954fc05ec7ae687dd486a8e84L104


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to