Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r226783272
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
    @@ -106,85 +107,96 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
           failOnDataLoss(caseInsensitiveParams))
       }
     
    -  /**
    -   * Creates a 
[[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport]] to 
read
    -   * batches of Kafka data in a micro-batch streaming query.
    -   */
    -  override def createMicroBatchReadSupport(
    -      metadataPath: String,
    -      options: DataSourceOptions): KafkaMicroBatchReadSupport = {
    -
    -    val parameters = options.asMap().asScala.toMap
    -    validateStreamOptions(parameters)
    -    // Each running query should use its own group id. Otherwise, the 
query may be only assigned
    -    // partial data since Kafka will assign partitions to multiple 
consumers having the same group
    -    // id. Hence, we should generate a unique id for each query.
    -    val uniqueGroupId = 
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
    -
    -    val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
    -    val specifiedKafkaParams =
    -      parameters
    -        .keySet
    -        .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
    -        .map { k => k.drop(6).toString -> parameters(k) }
    -        .toMap
    -
    -    val startingStreamOffsets = 
KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
    -      STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
    -
    -    val kafkaOffsetReader = new KafkaOffsetReader(
    -      strategy(caseInsensitiveParams),
    -      kafkaParamsForDriver(specifiedKafkaParams),
    -      parameters,
    -      driverGroupIdPrefix = s"$uniqueGroupId-driver")
    -
    -    new KafkaMicroBatchReadSupport(
    -      kafkaOffsetReader,
    -      kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
    -      options,
    -      metadataPath,
    -      startingStreamOffsets,
    -      failOnDataLoss(caseInsensitiveParams))
    +  override def getTable(options: DataSourceOptions): KafkaTable.type = {
    +    KafkaTable
       }
     
    -  /**
    -   * Creates a 
[[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to 
read
    -   * Kafka data in a continuous streaming query.
    -   */
    -  override def createContinuousReadSupport(
    -      metadataPath: String,
    -      options: DataSourceOptions): KafkaContinuousReadSupport = {
    -    val parameters = options.asMap().asScala.toMap
    -    validateStreamOptions(parameters)
    -    // Each running query should use its own group id. Otherwise, the 
query may be only assigned
    -    // partial data since Kafka will assign partitions to multiple 
consumers having the same group
    -    // id. Hence, we should generate a unique id for each query.
    -    val uniqueGroupId = 
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
    -
    -    val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
    -    val specifiedKafkaParams =
    -      parameters
    -        .keySet
    -        .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
    -        .map { k => k.drop(6).toString -> parameters(k) }
    -        .toMap
    +  object KafkaTable extends Table
    --- End diff --
    
    Why is `KafkaTable` an object, not a class? This doesn't seem to fit an 
abstraction.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to