Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/22547#discussion_r220274862
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchInputStream.scala
---
@@ -294,6 +227,88 @@ private[kafka010] class KafkaMicroBatchReadSupport(
}
}
+private[kafka010] class KafkaMicroBatchScan(
+ kafkaOffsetReader: KafkaOffsetReader,
+ rangeCalculator: KafkaOffsetRangeCalculator,
+ executorKafkaParams: ju.Map[String, Object],
+ pollTimeoutMs: Long,
+ failOnDataLoss: Boolean,
+ reportDataLoss: String => Unit,
+ start: KafkaSourceOffset,
+ end: KafkaSourceOffset) extends MicroBatchScan with Logging {
+
+ override def createReaderFactory(): PartitionReaderFactory = {
+ KafkaMicroBatchReaderFactory
+ }
+
+ override def planInputPartitions(): Array[InputPartition] = {
+ val startPartitionOffsets = start.partitionToOffsets
--- End diff --
moved from
https://github.com/apache/spark/pull/22547/files#diff-314d02b954fc05ec7ae687dd486a8e84L104
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]