[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

2017-03-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/17246


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

2017-03-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17246#discussion_r106798265
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -373,11 +374,213 @@ The following configurations are optional:
 
 
 
+## Writing Data to Kafka
+
+Here, we describe the support for writing Streaming Queries and Batch 
Queries to Apache Kafka. Take note that 
+Apache Kafka only supports at least once write semantics. Consequently, 
when writing---either Streaming Queries
+or Batch Queries---to Kafka, some records may be duplicated; this can 
happen, for example, if Kafka needs
+to retry a message that was not acknowledged by a Broker, even though that 
Broker received and wrote the message record.
+Structured Streaming cannot prevent such duplicates from occurring due to 
these Kafka write semantics. However, 
+if writing the query is successful, then you can assume that the query 
output was written at least once. A possible
+solution to remove duplicates when reading the written data could be to 
introduce a primary (unique) key 
+that can be used to perform de-duplication when reading.
+
+Each row being written to Kafka has the following schema:
--- End diff --

The Dataframe being written to Kafka should have the following columns in 
the schema.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

2017-03-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17246#discussion_r106798256
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -373,11 +374,213 @@ The following configurations are optional:
 
 
 
+## Writing Data to Kafka
+
+Here, we describe the support for writing Streaming Queries and Batch 
Queries to Apache Kafka. Take note that 
+Apache Kafka only supports at least once write semantics. Consequently, 
when writing---either Streaming Queries
+or Batch Queries---to Kafka, some records may be duplicated; this can 
happen, for example, if Kafka needs
+to retry a message that was not acknowledged by a Broker, even though that 
Broker received and wrote the message record.
+Structured Streaming cannot prevent such duplicates from occurring due to 
these Kafka write semantics. However, 
+if writing the query is successful, then you can assume that the query 
output was written at least once. A possible
+solution to remove duplicates when reading the written data could be to 
introduce a primary (unique) key 
+that can be used to perform de-duplication when reading.
--- End diff --

+1 for this suggestion!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

2017-03-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17246#discussion_r106798236
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -15,40 +15,42 @@ For Scala/Java applications using SBT/Maven project 
definitions, link your appli
 For Python applications, you need to add this above library and its 
dependencies when deploying your
 application. See the [Deploying](#deploying) subsection below.
 
-### Creating a Kafka Source Stream
+## Reading Data from Kafka
+
+### Creating a Kafka Source for Streaming Queries
 
 
 
 {% highlight scala %}
 
 // Subscribe to 1 topic
-val ds1 = spark
+val ds = spark
--- End diff --

nvm. minor point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

2017-03-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17246#discussion_r106798211
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -15,40 +15,42 @@ For Scala/Java applications using SBT/Maven project 
definitions, link your appli
 For Python applications, you need to add this above library and its 
dependencies when deploying your
 application. See the [Deploying](#deploying) subsection below.
 
-### Creating a Kafka Source Stream
+## Reading Data from Kafka
+
+### Creating a Kafka Source for Streaming Queries
 
 
 
 {% highlight scala %}
 
 // Subscribe to 1 topic
-val ds1 = spark
+val ds = spark
--- End diff --

hey ... `load()` will return a df not ds. may be a little confusing. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

2017-03-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17246#discussion_r106722579
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -373,11 +375,204 @@ The following configurations are optional:
 
 
 
+## Producing Data to Kafka
+
+### Writing Streaming Queries to Kafka
+
+
+
+{% highlight scala %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+val s1 = df1
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .start()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+val s2 = df2
+  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .start()
+
+{% endhighlight %}
+
+
+{% highlight java %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+StreamingQuery s1 = df1
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .start()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+StreamingQuery s2 = df1
+  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .start()
+
+{% endhighlight %}
+
+
+{% highlight python %}
+
+# Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+s1 = df1 \
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .writeStream \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .option("topic", "topic1") \
+  .start()
+
+# Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+s2 = df2 \
+  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .writeStream \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .start()
+
+{% endhighlight %}
+
+
+
+### Writing Batch Queries to Kafka
+
+
+
+{% highlight scala %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .save()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .save()
+
+{% endhighlight %}
+
+
+{% highlight java %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .save()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+df1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .save()
+
+{% endhighlight %}
+
+
+{% highlight python %}
+
+# Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .write \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .option("topic", "topic1") \
+  .save()
+
+# Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .write \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .save()
+
+{% endhighlight %}
+
+
+
+Each row being written to Kafka has the following schema:
--- End diff --

I think its better to put this section of necessary and optional columns 
before the examples, 

[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

2017-03-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17246#discussion_r106722231
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -373,11 +375,204 @@ The following configurations are optional:
 
 
 
+## Producing Data to Kafka
+
+### Writing Streaming Queries to Kafka
+
+
+
+{% highlight scala %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+val s1 = df1
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .start()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+val s2 = df2
+  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .start()
+
+{% endhighlight %}
+
+
+{% highlight java %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+StreamingQuery s1 = df1
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .start()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+StreamingQuery s2 = df1
+  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .start()
+
+{% endhighlight %}
+
+
+{% highlight python %}
+
+# Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+s1 = df1 \
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .writeStream \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .option("topic", "topic1") \
+  .start()
+
+# Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+s2 = df2 \
+  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .writeStream \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .start()
+
+{% endhighlight %}
+
+
+
+### Writing Batch Queries to Kafka
+
+
+
+{% highlight scala %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .save()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .save()
+
+{% endhighlight %}
+
+
+{% highlight java %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .save()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+df1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .save()
+
+{% endhighlight %}
+
+
+{% highlight python %}
+
+# Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .write \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .option("topic", "topic1") \
+  .save()
+
+# Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .write \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .save()
+
+{% endhighlight %}
+
+
+
+Each row being written to Kafka has the following schema:
+
+ColumnType
+
+  key (optional)
+  string or binary
+
+
+  value (required)
+  string or 

[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

2017-03-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17246#discussion_r106722120
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -373,11 +375,204 @@ The following configurations are optional:
 
 
 
+## Producing Data to Kafka
+
+### Writing Streaming Queries to Kafka
+
+
+
+{% highlight scala %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+val s1 = df1
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .start()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+val s2 = df2
+  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .start()
+
+{% endhighlight %}
+
+
+{% highlight java %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+StreamingQuery s1 = df1
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .start()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+StreamingQuery s2 = df1
+  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .start()
+
+{% endhighlight %}
+
+
+{% highlight python %}
+
+# Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+s1 = df1 \
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .writeStream \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .option("topic", "topic1") \
+  .start()
+
+# Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+s2 = df2 \
+  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .writeStream \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .start()
+
+{% endhighlight %}
+
+
+
+### Writing Batch Queries to Kafka
+
+
+
+{% highlight scala %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .save()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .save()
+
+{% endhighlight %}
+
+
+{% highlight java %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .save()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+df1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .save()
+
+{% endhighlight %}
+
+
+{% highlight python %}
+
+# Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .write \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .option("topic", "topic1") \
+  .save()
+
+# Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .write \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .save()
+
+{% endhighlight %}
+
+
+
+Each row being written to Kafka has the following schema:
+
+ColumnType
+
+  key (optional)
+  string or binary
+
+
+  value (required)
+  string or 

[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

2017-03-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17246#discussion_r106721924
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -3,9 +3,9 @@ layout: global
 title: Structured Streaming + Kafka Integration Guide (Kafka broker 
version 0.10.0 or higher)
 ---
 
-Structured Streaming integration for Kafka 0.10 to poll data from Kafka.
+Structured Streaming integration for Kafka 0.10 to consume data from and 
produce data to Kafka.
--- End diff --

Lets simply use "read data" and "write data"
Here as well as in the titles.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

2017-03-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17246#discussion_r106721185
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -373,11 +375,204 @@ The following configurations are optional:
 
 
 
+## Producing Data to Kafka
+
+### Writing Streaming Queries to Kafka
+
+
+
+{% highlight scala %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+val s1 = df1
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .start()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+val s2 = df2
+  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .start()
+
+{% endhighlight %}
+
+
+{% highlight java %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+StreamingQuery s1 = df1
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .start()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+StreamingQuery s2 = df1
+  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .start()
+
+{% endhighlight %}
+
+
+{% highlight python %}
+
+# Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+s1 = df1 \
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .writeStream \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .option("topic", "topic1") \
+  .start()
+
+# Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+s2 = df2 \
+  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .writeStream \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .start()
+
+{% endhighlight %}
+
+
+
+### Writing Batch Queries to Kafka
+
+
+
+{% highlight scala %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .save()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .save()
+
+{% endhighlight %}
+
+
+{% highlight java %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .save()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+df1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .save()
+
+{% endhighlight %}
+
+
+{% highlight python %}
+
+# Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .write \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .option("topic", "topic1") \
+  .save()
--- End diff --

Thanks for highlighting this though. This should be clearly documented.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: 

[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

2017-03-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17246#discussion_r106721109
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -373,11 +375,204 @@ The following configurations are optional:
 
 
 
+## Producing Data to Kafka
--- End diff --

I think section needs a big caveat regarding the lack of guarantees of 
Kafka writes - since Kafka does not support transactions, data may be 
duplicated due to failures and reexecutions


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

2017-03-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17246#discussion_r106720890
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -373,11 +375,204 @@ The following configurations are optional:
 
 
 
+## Producing Data to Kafka
+
+### Writing Streaming Queries to Kafka
+
+
+
+{% highlight scala %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+val s1 = df1
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .start()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+val s2 = df2
+  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .start()
+
+{% endhighlight %}
+
+
+{% highlight java %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+StreamingQuery s1 = df1
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .start()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+StreamingQuery s2 = df1
+  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .start()
+
+{% endhighlight %}
+
+
+{% highlight python %}
+
+# Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+s1 = df1 \
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .writeStream \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .option("topic", "topic1") \
+  .start()
+
+# Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+s2 = df2 \
+  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .writeStream \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .start()
+
+{% endhighlight %}
+
+
+
+### Writing Batch Queries to Kafka
--- End diff --

Writing output of ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

2017-03-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17246#discussion_r106720832
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -373,11 +375,204 @@ The following configurations are optional:
 
 
 
+## Producing Data to Kafka
+
+### Writing Streaming Queries to Kafka
--- End diff --

Writing output of...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

2017-03-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17246#discussion_r106720658
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -373,11 +375,204 @@ The following configurations are optional:
 
 
 
+## Producing Data to Kafka
+
+### Writing Streaming Queries to Kafka
+
+
+
+{% highlight scala %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+val s1 = df1
--- End diff --

nit: you can make all of the `df1` and `df2` just `df`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

2017-03-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17246#discussion_r106720722
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -373,11 +375,204 @@ The following configurations are optional:
 
 
 
+## Producing Data to Kafka
+
+### Writing Streaming Queries to Kafka
+
+
+
+{% highlight scala %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+val s1 = df1
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .start()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+val s2 = df2
+  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .start()
+
+{% endhighlight %}
+
+
+{% highlight java %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+StreamingQuery s1 = df1
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .start()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+StreamingQuery s2 = df1
--- End diff --

nit: did you mean df2? or just make all of them df?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

2017-03-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17246#discussion_r106720282
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -373,11 +375,204 @@ The following configurations are optional:
 
 
 
+## Producing Data to Kafka
+
+### Writing Streaming Queries to Kafka
+
+
+
+{% highlight scala %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+val s1 = df1
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .start()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+val s2 = df2
+  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .start()
+
+{% endhighlight %}
+
+
+{% highlight java %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+StreamingQuery s1 = df1
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .start()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+StreamingQuery s2 = df1
+  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .start()
+
+{% endhighlight %}
+
+
+{% highlight python %}
+
+# Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+s1 = df1 \
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .writeStream \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .option("topic", "topic1") \
+  .start()
+
+# Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+s2 = df2 \
+  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .writeStream \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .start()
+
+{% endhighlight %}
+
+
+
+### Writing Batch Queries to Kafka
+
+
+
+{% highlight scala %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .save()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .save()
+
+{% endhighlight %}
+
+
+{% highlight java %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .save()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+df1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .save()
+
+{% endhighlight %}
+
+
+{% highlight python %}
+
+# Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .write \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .option("topic", "topic1") \
+  .save()
--- End diff --

Its not. Kafka does not support transactions yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For 

[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

2017-03-10 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/17246#discussion_r105523893
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -373,11 +375,204 @@ The following configurations are optional:
 
 
 
+## Producing Data to Kafka
+
+### Writing Streaming Queries to Kafka
+
+
+
+{% highlight scala %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+val s1 = df1
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .start()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+val s2 = df2
+  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .start()
+
+{% endhighlight %}
+
+
+{% highlight java %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+StreamingQuery s1 = df1
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .start()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+StreamingQuery s2 = df1
+  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .start()
+
+{% endhighlight %}
+
+
+{% highlight python %}
+
+# Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+s1 = df1 \
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .writeStream \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .option("topic", "topic1") \
+  .start()
+
+# Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+s2 = df2 \
+  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .writeStream \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .start()
+
+{% endhighlight %}
+
+
+
+### Writing Batch Queries to Kafka
+
+
+
+{% highlight scala %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .save()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .save()
+
+{% endhighlight %}
+
+
+{% highlight java %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .save()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified 
in the data
+df1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .save()
+
+{% endhighlight %}
+
+
+{% highlight python %}
+
+# Write key-value data from a DataFrame to a specific Kafka topic 
specified in an option
+df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .write \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .option("topic", "topic1") \
+  .save()
--- End diff --

is it save() atomic?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

2017-03-10 Thread tcondie
GitHub user tcondie opened a pull request:

https://github.com/apache/spark/pull/17246

[SPARK-19906][SS][DOCS] Documentation describing how to write queries to 
Kafka

## What changes were proposed in this pull request?

Add documentation that describes how to write streaming and batch queries 
to Kafka.

@zsxwing @tdas 

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tcondie/spark kafka-write-docs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17246.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17246


commit 172d4505e5583c541e4644b1eeb12f853bf638cd
Author: Tyson Condie 
Date:   2017-03-10T19:14:33Z

update




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org