Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17043#discussion_r103047334
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
    @@ -152,6 +157,56 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister with Stre
           endingRelationOffsets)
       }
     
    +  override def createSink(
    +      sqlContext: SQLContext,
    +      parameters: Map[String, String],
    +      partitionColumns: Seq[String],
    +      outputMode: OutputMode): Sink = {
    +    val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase, v) }
    +    val defaultTopic = 
caseInsensitiveParams.get(TOPIC_OPTION_KEY).map(_.trim.toLowerCase)
    +    val specifiedKafkaParams =
    --- End diff --
    
    Need to throw an exception if the user specifies serializer like the 
source. Also need to add tests.
    ```
        if 
(caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG}"))
 {
          throw new IllegalArgumentException(
            s"Kafka option '${ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG}' is 
not supported as keys "
              + "are deserialized as byte arrays with ByteArrayDeserializer. 
Use DataFrame operations "
              + "to explicitly deserialize the keys.")
        }
    
        if 
(caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG}"))
        {
          throw new IllegalArgumentException(
            s"Kafka option '${ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG}' 
is not supported as "
              + "value are deserialized as byte arrays with 
ByteArrayDeserializer. Use DataFrame "
              + "operations to explicitly deserialize the values.")
        }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to