[ https://issues.apache.org/jira/browse/SPARK-17935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15601778#comment-15601778 ]
zhangxinyu edited comment on SPARK-17935 at 10/26/16 3:26 AM: -------------------------------------------------------------- h2. KafkaSink Design Doc h4. Goal Output results to kafka cluster(version 0.10.0.0) in structured streaming module. h4. Implement Four classes are implemented to output data to kafka cluster in structured streaming module. * *KafkaSinkProvider* This class extends trait *StreamSinkProvider* and trait *DataSourceRegister* and overrides function *shortName* and *createSink*. In function *createSink*, *KafkaSink* is created. * *KafkaSink* KafkaSink extends *Sink* and overrides function *addBatch*. *KafkaSinkRDD* will be created in function *addBatch*. * *KafkaSinkRDD* *KafkaSinkRDD* is designed to distributedly send results to kafka clusters. It extends *RDD*. In function *compute*, *CachedKafkaProducer* will be called to get or create producer to send data * *CachedKafkaProducer* *CachedKafkaProducer* is used to store producers in the executors so that these producers can be reused. h4. Configuration * *Kafka Producer Configuration* "*.option()*" is used to configure kafka producer configurations which are all starting with "*kafka.*". For example, producer configuration *bootstrap.servers* can be configured by *.option("kafka.bootstrap.servers", kafka-servers)*. * *Other Configuration* Other configuration is also set by ".option()". The difference is these configurations don't start with "kafka.". h4. Usage val query = input.writeStream .format("kafka-sink-10") .outputMode("append") .option("kafka.bootstrap.servers", kafka-servers) .option(“topic”, topic) .start() was (Author: zhangxinyu): h2. KafkaSink Design Doc h4. Goal Output results to kafka cluster(version 0.10.0.0) in structured streaming module. h4. Implement Four classes are implemented to output data to kafka cluster in structured streaming module. * *KafkaSinkProvider* This class extends trait *StreamSinkProvider* and trait *DataSourceRegister* and overrides function *shortName* and *createSink*. In function *createSink*, *KafkaSink* is created. * *KafkaSink* KafkaSink extends *Sink* and overrides function *addBatch*. *KafkaSinkRDD* will be created in function *addBatch*. * *KafkaSinkRDD* *KafkaSinkRDD* is designed to distributedly send results to kafka clusters. It extends *RDD*. In function *compute*, *CachedKafkaProducer* will be called to get or create producer to send data * *CachedKafkaProducer* *CachedKafkaProducer* is used to store producers in the executors so that these producers can be reused. h4. Configuration * *Kafka Producer Configuration* "*.option()*" is used to configure kafka producer configurations which are all starting with "*kafka.*". For example, producer configuration *bootstrap.servers* can be configured by *.option("kafka.bootstrap.servers", kafka-servers)*. * *Other Configuration* Other configuration is also set by ".option()". The difference is these configurations don't start with "kafka.". h4. Usage val query = input.writeStream .format("kafkaSink") .outputMode("append") .option("kafka.bootstrap.servers", kafka-servers) .option(“topic”, topic) .start() > Add KafkaForeachWriter in external kafka-0.8.0 for structured streaming module > ------------------------------------------------------------------------------ > > Key: SPARK-17935 > URL: https://issues.apache.org/jira/browse/SPARK-17935 > Project: Spark > Issue Type: Improvement > Components: SQL, Streaming > Affects Versions: 2.0.0 > Reporter: zhangxinyu > > Now spark already supports kafkaInputStream. It would be useful that we add > `KafkaForeachWriter` to output results to kafka in structured streaming > module. > `KafkaForeachWriter.scala` is put in external kafka-0.8.0. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org