spark git commit: [SPARK-19719][SS] Kafka writer for both structured streaming and batch queires
Repository: spark Updated Branches: refs/heads/branch-2.1 ca7a7e8a8 -> fd6c6d5c3 [SPARK-19719][SS] Kafka writer for both structured streaming and batch queires ## What changes were proposed in this pull request? Add a new Kafka Sink and Kafka Relation for writing streaming and batch queries, respectively, to Apache Kafka. ### Streaming Kafka Sink - When addBatch is called -- If batchId is great than the last written batch --- Write batch to Kafka Topic will be taken from the record, if present, or from a topic option, which overrides topic in record. -- Else ignore ### Batch Kafka Sink - KafkaSourceProvider will implement CreatableRelationProvider - CreatableRelationProvider#createRelation will write the passed in Dataframe to a Kafka - Topic will be taken from the record, if present, or from topic option, which overrides topic in record. - Save modes Append and ErrorIfExist supported under identical semantics. Other save modes result in an AnalysisException tdas zsxwing ## How was this patch tested? ### The following unit tests will be included - write to stream with topic field: valid stream write with data that includes an existing topic in the schema - write structured streaming aggregation w/o topic field, with default topic: valid stream write with data that does not include a topic field, but the configuration includes a default topic - write data with bad schema: various cases of writing data that does not conform to a proper schema e.g., 1. no topic field or default topic, and 2. no value field - write data with valid schema but wrong types: data with a complete schema but wrong types e.g., key and value types are integers. - write to non-existing topic: write a stream to a topic that does not exist in Kafka, which has been configured to not auto-create topics. - write batch to kafka: simple write batch to Kafka, which goes through the same code path as streaming scenario, so validity checks will not be redone here. ### Examples ```scala // Structured Streaming val writer = inputStringStream.map(s => s.get(0).toString.getBytes()).toDF("value") .selectExpr("value as key", "value as value") .writeStream .format("kafka") .option("checkpointLocation", checkpointDir) .outputMode(OutputMode.Append) .option("kafka.bootstrap.servers", brokerAddress) .option("topic", topic) .queryName("kafkaStream") .start() // Batch val df = spark .sparkContext .parallelize(Seq("1", "2", "3", "4", "5")) .map(v => (topic, v)) .toDF("topic", "value") df.write .format("kafka") .option("kafka.bootstrap.servers",brokerAddress) .option("topic", topic) .save() ``` Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tyson CondieCloses #17043 from tcondie/kafka-writer. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fd6c6d5c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fd6c6d5c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fd6c6d5c Branch: refs/heads/branch-2.1 Commit: fd6c6d5c363008a229759bf628edc0f6c5e00ade Parents: ca7a7e8 Author: Tyson Condie Authored: Mon Mar 6 16:39:05 2017 -0800 Committer: Tathagata Das Committed: Mon Mar 6 16:53:52 2017 -0800 -- .../apache/spark/sql/kafka010/KafkaSink.scala | 43 ++ .../sql/kafka010/KafkaSourceProvider.scala | 83 +++- .../spark/sql/kafka010/KafkaWriteTask.scala | 123 ++ .../apache/spark/sql/kafka010/KafkaWriter.scala | 97 + .../spark/sql/kafka010/KafkaSinkSuite.scala | 412 +++ 5 files changed, 753 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fd6c6d5c/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala new file mode 100644 index 000..08914d8 --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + *
spark git commit: [SPARK-19719][SS] Kafka writer for both structured streaming and batch queires
Repository: spark Updated Branches: refs/heads/master f6471dc0d -> b0a5cd890 [SPARK-19719][SS] Kafka writer for both structured streaming and batch queires ## What changes were proposed in this pull request? Add a new Kafka Sink and Kafka Relation for writing streaming and batch queries, respectively, to Apache Kafka. ### Streaming Kafka Sink - When addBatch is called -- If batchId is great than the last written batch --- Write batch to Kafka Topic will be taken from the record, if present, or from a topic option, which overrides topic in record. -- Else ignore ### Batch Kafka Sink - KafkaSourceProvider will implement CreatableRelationProvider - CreatableRelationProvider#createRelation will write the passed in Dataframe to a Kafka - Topic will be taken from the record, if present, or from topic option, which overrides topic in record. - Save modes Append and ErrorIfExist supported under identical semantics. Other save modes result in an AnalysisException tdas zsxwing ## How was this patch tested? ### The following unit tests will be included - write to stream with topic field: valid stream write with data that includes an existing topic in the schema - write structured streaming aggregation w/o topic field, with default topic: valid stream write with data that does not include a topic field, but the configuration includes a default topic - write data with bad schema: various cases of writing data that does not conform to a proper schema e.g., 1. no topic field or default topic, and 2. no value field - write data with valid schema but wrong types: data with a complete schema but wrong types e.g., key and value types are integers. - write to non-existing topic: write a stream to a topic that does not exist in Kafka, which has been configured to not auto-create topics. - write batch to kafka: simple write batch to Kafka, which goes through the same code path as streaming scenario, so validity checks will not be redone here. ### Examples ```scala // Structured Streaming val writer = inputStringStream.map(s => s.get(0).toString.getBytes()).toDF("value") .selectExpr("value as key", "value as value") .writeStream .format("kafka") .option("checkpointLocation", checkpointDir) .outputMode(OutputMode.Append) .option("kafka.bootstrap.servers", brokerAddress) .option("topic", topic) .queryName("kafkaStream") .start() // Batch val df = spark .sparkContext .parallelize(Seq("1", "2", "3", "4", "5")) .map(v => (topic, v)) .toDF("topic", "value") df.write .format("kafka") .option("kafka.bootstrap.servers",brokerAddress) .option("topic", topic) .save() ``` Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tyson CondieCloses #17043 from tcondie/kafka-writer. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b0a5cd89 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b0a5cd89 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b0a5cd89 Branch: refs/heads/master Commit: b0a5cd89097c563e9949d8cfcf84d18b03b8d24c Parents: f6471dc Author: Tyson Condie Authored: Mon Mar 6 16:39:05 2017 -0800 Committer: Tathagata Das Committed: Mon Mar 6 16:39:05 2017 -0800 -- .../apache/spark/sql/kafka010/KafkaSink.scala | 43 ++ .../sql/kafka010/KafkaSourceProvider.scala | 83 +++- .../spark/sql/kafka010/KafkaWriteTask.scala | 123 ++ .../apache/spark/sql/kafka010/KafkaWriter.scala | 97 + .../spark/sql/kafka010/KafkaSinkSuite.scala | 412 +++ 5 files changed, 753 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b0a5cd89/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala new file mode 100644 index 000..08914d8 --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under