Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/22547#discussion_r220275016
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
---
@@ -106,85 +107,96 @@ private[kafka010] class KafkaSourceProvider extends
DataSourceRegister
failOnDataLoss(caseInsensitiveParams))
}
- /**
- * Creates a
[[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport]] to
read
- * batches of Kafka data in a micro-batch streaming query.
- */
- override def createMicroBatchReadSupport(
- metadataPath: String,
- options: DataSourceOptions): KafkaMicroBatchReadSupport = {
-
- val parameters = options.asMap().asScala.toMap
- validateStreamOptions(parameters)
- // Each running query should use its own group id. Otherwise, the
query may be only assigned
- // partial data since Kafka will assign partitions to multiple
consumers having the same group
- // id. Hence, we should generate a unique id for each query.
- val uniqueGroupId =
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
-
- val caseInsensitiveParams = parameters.map { case (k, v) =>
(k.toLowerCase(Locale.ROOT), v) }
- val specifiedKafkaParams =
- parameters
- .keySet
- .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
- .map { k => k.drop(6).toString -> parameters(k) }
- .toMap
-
- val startingStreamOffsets =
KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
- STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
-
- val kafkaOffsetReader = new KafkaOffsetReader(
- strategy(caseInsensitiveParams),
- kafkaParamsForDriver(specifiedKafkaParams),
- parameters,
- driverGroupIdPrefix = s"$uniqueGroupId-driver")
-
- new KafkaMicroBatchReadSupport(
- kafkaOffsetReader,
- kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
- options,
- metadataPath,
- startingStreamOffsets,
- failOnDataLoss(caseInsensitiveParams))
+ override def getTable(options: DataSourceOptions): KafkaTable.type = {
+ KafkaTable
}
- /**
- * Creates a
[[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to
read
- * Kafka data in a continuous streaming query.
- */
- override def createContinuousReadSupport(
- metadataPath: String,
- options: DataSourceOptions): KafkaContinuousReadSupport = {
- val parameters = options.asMap().asScala.toMap
- validateStreamOptions(parameters)
- // Each running query should use its own group id. Otherwise, the
query may be only assigned
- // partial data since Kafka will assign partitions to multiple
consumers having the same group
- // id. Hence, we should generate a unique id for each query.
- val uniqueGroupId =
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
-
- val caseInsensitiveParams = parameters.map { case (k, v) =>
(k.toLowerCase(Locale.ROOT), v) }
- val specifiedKafkaParams =
- parameters
- .keySet
- .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
- .map { k => k.drop(6).toString -> parameters(k) }
- .toMap
+ object KafkaTable extends Table
+ with SupportsMicroBatchRead with SupportsContinuousRead {
- val startingStreamOffsets =
KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
- STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
+ override def schema(): StructType = KafkaOffsetReader.kafkaSchema
- val kafkaOffsetReader = new KafkaOffsetReader(
- strategy(caseInsensitiveParams),
- kafkaParamsForDriver(specifiedKafkaParams),
- parameters,
- driverGroupIdPrefix = s"$uniqueGroupId-driver")
+ /**
+ * Creates a
[[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchInputStream]] to
read
+ * batches of Kafka data in a micro-batch streaming query.
+ */
+ override def createMicroBatchInputStream(
+ checkpointLocation: String,
+ config: ScanConfig,
+ options: DataSourceOptions): MicroBatchInputStream = {
+ val parameters = options.asMap().asScala.toMap
--- End diff --
moved from
https://github.com/apache/spark/pull/22547/files#diff-eeac5bdf3a1ecd7b9f8aaf10fff37f05L117
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]