spark git commit: [SPARK-19719][SS] Kafka writer for both structured streaming and batch queires

2017-03-06 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 ca7a7e8a8 -> fd6c6d5c3


[SPARK-19719][SS] Kafka writer for both structured streaming and batch queires

## What changes were proposed in this pull request?

Add a new Kafka Sink and Kafka Relation for writing streaming and batch 
queries, respectively, to Apache Kafka.
### Streaming Kafka Sink
- When addBatch is called
-- If batchId is great than the last written batch
--- Write batch to Kafka
 Topic will be taken from the record, if present, or from a topic option, 
which overrides topic in record.
-- Else ignore

### Batch Kafka Sink
- KafkaSourceProvider will implement CreatableRelationProvider
- CreatableRelationProvider#createRelation will write the passed in Dataframe 
to a Kafka
- Topic will be taken from the record, if present, or from topic option, which 
overrides topic in record.
- Save modes Append and ErrorIfExist supported under identical semantics. Other 
save modes result in an AnalysisException

tdas zsxwing

## How was this patch tested?

### The following unit tests will be included
- write to stream with topic field: valid stream write with data that includes 
an existing topic in the schema
- write structured streaming aggregation w/o topic field, with default topic: 
valid stream write with data that does not include a topic field, but the 
configuration includes a default topic
- write data with bad schema: various cases of writing data that does not 
conform to a proper schema e.g., 1. no topic field or default topic, and 2. no 
value field
- write data with valid schema but wrong types: data with a complete schema but 
wrong types e.g., key and value types are integers.
- write to non-existing topic: write a stream to a topic that does not exist in 
Kafka, which has been configured to not auto-create topics.
- write batch to kafka: simple write batch to Kafka, which goes through the 
same code path as streaming scenario, so validity checks will not be redone 
here.

### Examples
```scala
// Structured Streaming
val writer = inputStringStream.map(s => 
s.get(0).toString.getBytes()).toDF("value")
 .selectExpr("value as key", "value as value")
 .writeStream
 .format("kafka")
 .option("checkpointLocation", checkpointDir)
 .outputMode(OutputMode.Append)
 .option("kafka.bootstrap.servers", brokerAddress)
 .option("topic", topic)
 .queryName("kafkaStream")
 .start()

// Batch
val df = spark
 .sparkContext
 .parallelize(Seq("1", "2", "3", "4", "5"))
 .map(v => (topic, v))
 .toDF("topic", "value")

df.write
 .format("kafka")
 .option("kafka.bootstrap.servers",brokerAddress)
 .option("topic", topic)
 .save()
```
Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Tyson Condie 

Closes #17043 from tcondie/kafka-writer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fd6c6d5c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fd6c6d5c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fd6c6d5c

Branch: refs/heads/branch-2.1
Commit: fd6c6d5c363008a229759bf628edc0f6c5e00ade
Parents: ca7a7e8
Author: Tyson Condie 
Authored: Mon Mar 6 16:39:05 2017 -0800
Committer: Tathagata Das 
Committed: Mon Mar 6 16:53:52 2017 -0800

--
 .../apache/spark/sql/kafka010/KafkaSink.scala   |  43 ++
 .../sql/kafka010/KafkaSourceProvider.scala  |  83 +++-
 .../spark/sql/kafka010/KafkaWriteTask.scala | 123 ++
 .../apache/spark/sql/kafka010/KafkaWriter.scala |  97 +
 .../spark/sql/kafka010/KafkaSinkSuite.scala | 412 +++
 5 files changed, 753 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fd6c6d5c/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala
new file mode 100644
index 000..08914d8
--- /dev/null
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * 

spark git commit: [SPARK-19719][SS] Kafka writer for both structured streaming and batch queires

2017-03-06 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master f6471dc0d -> b0a5cd890


[SPARK-19719][SS] Kafka writer for both structured streaming and batch queires

## What changes were proposed in this pull request?

Add a new Kafka Sink and Kafka Relation for writing streaming and batch 
queries, respectively, to Apache Kafka.
### Streaming Kafka Sink
- When addBatch is called
-- If batchId is great than the last written batch
--- Write batch to Kafka
 Topic will be taken from the record, if present, or from a topic option, 
which overrides topic in record.
-- Else ignore

### Batch Kafka Sink
- KafkaSourceProvider will implement CreatableRelationProvider
- CreatableRelationProvider#createRelation will write the passed in Dataframe 
to a Kafka
- Topic will be taken from the record, if present, or from topic option, which 
overrides topic in record.
- Save modes Append and ErrorIfExist supported under identical semantics. Other 
save modes result in an AnalysisException

tdas zsxwing

## How was this patch tested?

### The following unit tests will be included
- write to stream with topic field: valid stream write with data that includes 
an existing topic in the schema
- write structured streaming aggregation w/o topic field, with default topic: 
valid stream write with data that does not include a topic field, but the 
configuration includes a default topic
- write data with bad schema: various cases of writing data that does not 
conform to a proper schema e.g., 1. no topic field or default topic, and 2. no 
value field
- write data with valid schema but wrong types: data with a complete schema but 
wrong types e.g., key and value types are integers.
- write to non-existing topic: write a stream to a topic that does not exist in 
Kafka, which has been configured to not auto-create topics.
- write batch to kafka: simple write batch to Kafka, which goes through the 
same code path as streaming scenario, so validity checks will not be redone 
here.

### Examples
```scala
// Structured Streaming
val writer = inputStringStream.map(s => 
s.get(0).toString.getBytes()).toDF("value")
 .selectExpr("value as key", "value as value")
 .writeStream
 .format("kafka")
 .option("checkpointLocation", checkpointDir)
 .outputMode(OutputMode.Append)
 .option("kafka.bootstrap.servers", brokerAddress)
 .option("topic", topic)
 .queryName("kafkaStream")
 .start()

// Batch
val df = spark
 .sparkContext
 .parallelize(Seq("1", "2", "3", "4", "5"))
 .map(v => (topic, v))
 .toDF("topic", "value")

df.write
 .format("kafka")
 .option("kafka.bootstrap.servers",brokerAddress)
 .option("topic", topic)
 .save()
```
Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Tyson Condie 

Closes #17043 from tcondie/kafka-writer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b0a5cd89
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b0a5cd89
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b0a5cd89

Branch: refs/heads/master
Commit: b0a5cd89097c563e9949d8cfcf84d18b03b8d24c
Parents: f6471dc
Author: Tyson Condie 
Authored: Mon Mar 6 16:39:05 2017 -0800
Committer: Tathagata Das 
Committed: Mon Mar 6 16:39:05 2017 -0800

--
 .../apache/spark/sql/kafka010/KafkaSink.scala   |  43 ++
 .../sql/kafka010/KafkaSourceProvider.scala  |  83 +++-
 .../spark/sql/kafka010/KafkaWriteTask.scala | 123 ++
 .../apache/spark/sql/kafka010/KafkaWriter.scala |  97 +
 .../spark/sql/kafka010/KafkaSinkSuite.scala | 412 +++
 5 files changed, 753 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b0a5cd89/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala
new file mode 100644
index 000..08914d8
--- /dev/null
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under