[ 
https://issues.apache.org/jira/browse/SPARK-17935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15601778#comment-15601778
 ] 

zhangxinyu edited comment on SPARK-17935 at 10/31/16 8:25 AM:
--------------------------------------------------------------

h2. KafkaSink Design Doc

h4. Goal
Output results to kafka cluster(version 0.10.0.0) in structured streaming 
module.

h4. Implement
Four classes are implemented to output data to kafka cluster in structured 
streaming module.
* *KafkaSinkProvider*
This class extends trait *StreamSinkProvider* and trait *DataSourceRegister* 
and overrides function *shortName* and *createSink*. In function *createSink*, 
*ForeachSink* with *KafkaForeachWriter* is created.
* *KafkaForeachWriter*
*KafkaForeachWriter* is like what I proposed before. The only difference is 
that KafkaProducer is new producer and created in *CachedKafkaProducer*.
* *CachedKafkaProducer*
*CachedKafkaProducer* is used to store producers in the executors so that these 
producers can be reused.

h4. Configuration
* *Kafka Producer Configuration*
"*.option()*" is used to configure kafka producer configurations which are all 
starting with "*kafka.*". For example, producer configuration 
*bootstrap.servers* can be configured by *.option("kafka.bootstrap.servers", 
kafka-servers)*.
* *Other Configuration*
Other configuration is also set by ".option()". The difference is these 
configurations don't start with "kafka.".

h4. Usage
val query = input.writeStream
                          .format("kafka-sink-10")
                          .outputMode("append")
                          .option("kafka.bootstrap.servers", kafka-servers)
                          .option(“topic”, topic)
                          .start()




was (Author: zhangxinyu):
h2. KafkaSink Design Doc

h4. Goal
Output results to kafka cluster(version 0.10.0.0) in structured streaming 
module.

h4. Implement
Four classes are implemented to output data to kafka cluster in structured 
streaming module.
* *KafkaSinkProvider*
This class extends trait *StreamSinkProvider* and trait *DataSourceRegister* 
and overrides function *shortName* and *createSink*. In function *createSink*, 
*KafkaSink* is created.
* *KafkaSink*
KafkaSink extends *Sink* and overrides function *addBatch*. KafkaProducer is 
created once in per jvm for the same producer paragrams. Data will be send to 
kafka cluster distributedly which is like *ForeachSink*
* *CachedKafkaProducer*
*CachedKafkaProducer* is used to store producers in the executors so that these 
producers can be reused.

h4. Configuration
* *Kafka Producer Configuration*
"*.option()*" is used to configure kafka producer configurations which are all 
starting with "*kafka.*". For example, producer configuration 
*bootstrap.servers* can be configured by *.option("kafka.bootstrap.servers", 
kafka-servers)*.
* *Other Configuration*
Other configuration is also set by ".option()". The difference is these 
configurations don't start with "kafka.".

h4. Usage
val query = input.writeStream
                          .format("kafka-sink-10")
                          .outputMode("append")
                          .option("kafka.bootstrap.servers", kafka-servers)
                          .option(“topic”, topic)
                          .start()



> Add KafkaForeachWriter in external kafka-0.8.0 for structured streaming module
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-17935
>                 URL: https://issues.apache.org/jira/browse/SPARK-17935
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL, Streaming
>    Affects Versions: 2.0.0
>            Reporter: zhangxinyu
>
> Now spark already supports kafkaInputStream. It would be useful that we add 
> `KafkaForeachWriter` to output results to kafka in structured streaming 
> module.
> `KafkaForeachWriter.scala` is put in external kafka-0.8.0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to