[ 
https://issues.apache.org/jira/browse/SPARK-17935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15601778#comment-15601778
 ] 

zhangxinyu edited comment on SPARK-17935 at 10/31/16 7:53 AM:
--------------------------------------------------------------

h2. KafkaSink Design Doc

h4. Goal
Output results to kafka cluster(version 0.10.0.0) in structured streaming 
module.

h4. Implement
Four classes are implemented to output data to kafka cluster in structured 
streaming module.
* *KafkaSinkProvider*
This class extends trait *StreamSinkProvider* and trait *DataSourceRegister* 
and overrides function *shortName* and *createSink*. In function *createSink*, 
*KafkaSink* is created.
* *KafkaSink*
KafkaSink extends *Sink* and overrides function *addBatch*. *KafkaSinkRDD* will 
be created in function *addBatch*.
* *CachedKafkaProducer*
*CachedKafkaProducer* is used to store producers in the executors so that these 
producers can be reused.

h4. Configuration
* *Kafka Producer Configuration*
"*.option()*" is used to configure kafka producer configurations which are all 
starting with "*kafka.*". For example, producer configuration 
*bootstrap.servers* can be configured by *.option("kafka.bootstrap.servers", 
kafka-servers)*.
* *Other Configuration*
Other configuration is also set by ".option()". The difference is these 
configurations don't start with "kafka.".

h4. Usage
val query = input.writeStream
                          .format("kafka-sink-10")
                          .outputMode("append")
                          .option("kafka.bootstrap.servers", kafka-servers)
                          .option(“topic”, topic)
                          .start()




was (Author: zhangxinyu):
h2. KafkaSink Design Doc

h4. Goal
Output results to kafka cluster(version 0.10.0.0) in structured streaming 
module.

h4. Implement
Four classes are implemented to output data to kafka cluster in structured 
streaming module.
* *KafkaSinkProvider*
This class extends trait *StreamSinkProvider* and trait *DataSourceRegister* 
and overrides function *shortName* and *createSink*. In function *createSink*, 
*KafkaSink* is created.
* *KafkaSink*
KafkaSink extends *Sink* and overrides function *addBatch*. *KafkaSinkRDD* will 
be created in function *addBatch*.
* *KafkaSinkRDD*
*KafkaSinkRDD* is designed to distributedly send results to kafka clusters. It 
extends *RDD*. In function *compute*, *CachedKafkaProducer* will be called to 
get or create producer to send data
* *CachedKafkaProducer*
*CachedKafkaProducer* is used to store producers in the executors so that these 
producers can be reused.

h4. Configuration
* *Kafka Producer Configuration*
"*.option()*" is used to configure kafka producer configurations which are all 
starting with "*kafka.*". For example, producer configuration 
*bootstrap.servers* can be configured by *.option("kafka.bootstrap.servers", 
kafka-servers)*.
* *Other Configuration*
Other configuration is also set by ".option()". The difference is these 
configurations don't start with "kafka.".

h4. Usage
val query = input.writeStream
                          .format("kafka-sink-10")
                          .outputMode("append")
                          .option("kafka.bootstrap.servers", kafka-servers)
                          .option(“topic”, topic)
                          .start()



> Add KafkaForeachWriter in external kafka-0.8.0 for structured streaming module
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-17935
>                 URL: https://issues.apache.org/jira/browse/SPARK-17935
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL, Streaming
>    Affects Versions: 2.0.0
>            Reporter: zhangxinyu
>
> Now spark already supports kafkaInputStream. It would be useful that we add 
> `KafkaForeachWriter` to output results to kafka in structured streaming 
> module.
> `KafkaForeachWriter.scala` is put in external kafka-0.8.0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to