[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/17246 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/17246#discussion_r106798265 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -373,11 +374,213 @@ The following configurations are optional: +## Writing Data to Kafka + +Here, we describe the support for writing Streaming Queries and Batch Queries to Apache Kafka. Take note that +Apache Kafka only supports at least once write semantics. Consequently, when writing---either Streaming Queries +or Batch Queries---to Kafka, some records may be duplicated; this can happen, for example, if Kafka needs +to retry a message that was not acknowledged by a Broker, even though that Broker received and wrote the message record. +Structured Streaming cannot prevent such duplicates from occurring due to these Kafka write semantics. However, +if writing the query is successful, then you can assume that the query output was written at least once. A possible +solution to remove duplicates when reading the written data could be to introduce a primary (unique) key +that can be used to perform de-duplication when reading. + +Each row being written to Kafka has the following schema: --- End diff -- The Dataframe being written to Kafka should have the following columns in the schema. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/17246#discussion_r106798256 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -373,11 +374,213 @@ The following configurations are optional: +## Writing Data to Kafka + +Here, we describe the support for writing Streaming Queries and Batch Queries to Apache Kafka. Take note that +Apache Kafka only supports at least once write semantics. Consequently, when writing---either Streaming Queries +or Batch Queries---to Kafka, some records may be duplicated; this can happen, for example, if Kafka needs +to retry a message that was not acknowledged by a Broker, even though that Broker received and wrote the message record. +Structured Streaming cannot prevent such duplicates from occurring due to these Kafka write semantics. However, +if writing the query is successful, then you can assume that the query output was written at least once. A possible +solution to remove duplicates when reading the written data could be to introduce a primary (unique) key +that can be used to perform de-duplication when reading. --- End diff -- +1 for this suggestion! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/17246#discussion_r106798236 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -15,40 +15,42 @@ For Scala/Java applications using SBT/Maven project definitions, link your appli For Python applications, you need to add this above library and its dependencies when deploying your application. See the [Deploying](#deploying) subsection below. -### Creating a Kafka Source Stream +## Reading Data from Kafka + +### Creating a Kafka Source for Streaming Queries {% highlight scala %} // Subscribe to 1 topic -val ds1 = spark +val ds = spark --- End diff -- nvm. minor point. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/17246#discussion_r106798211 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -15,40 +15,42 @@ For Scala/Java applications using SBT/Maven project definitions, link your appli For Python applications, you need to add this above library and its dependencies when deploying your application. See the [Deploying](#deploying) subsection below. -### Creating a Kafka Source Stream +## Reading Data from Kafka + +### Creating a Kafka Source for Streaming Queries {% highlight scala %} // Subscribe to 1 topic -val ds1 = spark +val ds = spark --- End diff -- hey ... `load()` will return a df not ds. may be a little confusing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/17246#discussion_r106722579 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -373,11 +375,204 @@ The following configurations are optional: +## Producing Data to Kafka + +### Writing Streaming Queries to Kafka + + + +{% highlight scala %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +val s1 = df1 + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .start() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +val s2 = df2 + .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .start() + +{% endhighlight %} + + +{% highlight java %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +StreamingQuery s1 = df1 + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .start() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +StreamingQuery s2 = df1 + .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .start() + +{% endhighlight %} + + +{% highlight python %} + +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option +s1 = df1 \ + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ + .writeStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("topic", "topic1") \ + .start() + +# Write key-value data from a DataFrame to Kafka using a topic specified in the data +s2 = df2 \ + .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \ + .writeStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .start() + +{% endhighlight %} + + + +### Writing Batch Queries to Kafka + + + +{% highlight scala %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .write + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .save() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .write + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .save() + +{% endhighlight %} + + +{% highlight java %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .write() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .save() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +df1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .write() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .save() + +{% endhighlight %} + + +{% highlight python %} + +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ + .write \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("topic", "topic1") \ + .save() + +# Write key-value data from a DataFrame to Kafka using a topic specified in the data +df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \ + .write \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .save() + +{% endhighlight %} + + + +Each row being written to Kafka has the following schema: --- End diff -- I think its better to put this section of necessary and optional columns before the examples, righ
[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/17246#discussion_r106722231 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -373,11 +375,204 @@ The following configurations are optional: +## Producing Data to Kafka + +### Writing Streaming Queries to Kafka + + + +{% highlight scala %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +val s1 = df1 + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .start() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +val s2 = df2 + .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .start() + +{% endhighlight %} + + +{% highlight java %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +StreamingQuery s1 = df1 + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .start() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +StreamingQuery s2 = df1 + .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .start() + +{% endhighlight %} + + +{% highlight python %} + +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option +s1 = df1 \ + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ + .writeStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("topic", "topic1") \ + .start() + +# Write key-value data from a DataFrame to Kafka using a topic specified in the data +s2 = df2 \ + .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \ + .writeStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .start() + +{% endhighlight %} + + + +### Writing Batch Queries to Kafka + + + +{% highlight scala %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .write + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .save() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .write + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .save() + +{% endhighlight %} + + +{% highlight java %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .write() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .save() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +df1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .write() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .save() + +{% endhighlight %} + + +{% highlight python %} + +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ + .write \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("topic", "topic1") \ + .save() + +# Write key-value data from a DataFrame to Kafka using a topic specified in the data +df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \ + .write \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .save() + +{% endhighlight %} + + + +Each row being written to Kafka has the following schema: + +ColumnType + + key (optional) + string or binary + + + value (required) + string or b
[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/17246#discussion_r106722120 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -373,11 +375,204 @@ The following configurations are optional: +## Producing Data to Kafka + +### Writing Streaming Queries to Kafka + + + +{% highlight scala %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +val s1 = df1 + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .start() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +val s2 = df2 + .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .start() + +{% endhighlight %} + + +{% highlight java %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +StreamingQuery s1 = df1 + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .start() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +StreamingQuery s2 = df1 + .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .start() + +{% endhighlight %} + + +{% highlight python %} + +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option +s1 = df1 \ + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ + .writeStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("topic", "topic1") \ + .start() + +# Write key-value data from a DataFrame to Kafka using a topic specified in the data +s2 = df2 \ + .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \ + .writeStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .start() + +{% endhighlight %} + + + +### Writing Batch Queries to Kafka + + + +{% highlight scala %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .write + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .save() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .write + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .save() + +{% endhighlight %} + + +{% highlight java %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .write() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .save() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +df1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .write() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .save() + +{% endhighlight %} + + +{% highlight python %} + +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ + .write \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("topic", "topic1") \ + .save() + +# Write key-value data from a DataFrame to Kafka using a topic specified in the data +df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \ + .write \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .save() + +{% endhighlight %} + + + +Each row being written to Kafka has the following schema: + +ColumnType + + key (optional) + string or binary + + + value (required) + string or b
[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/17246#discussion_r106721924 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -3,9 +3,9 @@ layout: global title: Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) --- -Structured Streaming integration for Kafka 0.10 to poll data from Kafka. +Structured Streaming integration for Kafka 0.10 to consume data from and produce data to Kafka. --- End diff -- Lets simply use "read data" and "write data" Here as well as in the titles. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/17246#discussion_r106721185 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -373,11 +375,204 @@ The following configurations are optional: +## Producing Data to Kafka + +### Writing Streaming Queries to Kafka + + + +{% highlight scala %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +val s1 = df1 + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .start() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +val s2 = df2 + .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .start() + +{% endhighlight %} + + +{% highlight java %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +StreamingQuery s1 = df1 + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .start() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +StreamingQuery s2 = df1 + .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .start() + +{% endhighlight %} + + +{% highlight python %} + +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option +s1 = df1 \ + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ + .writeStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("topic", "topic1") \ + .start() + +# Write key-value data from a DataFrame to Kafka using a topic specified in the data +s2 = df2 \ + .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \ + .writeStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .start() + +{% endhighlight %} + + + +### Writing Batch Queries to Kafka + + + +{% highlight scala %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .write + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .save() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .write + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .save() + +{% endhighlight %} + + +{% highlight java %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .write() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .save() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +df1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .write() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .save() + +{% endhighlight %} + + +{% highlight python %} + +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ + .write \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("topic", "topic1") \ + .save() --- End diff -- Thanks for highlighting this though. This should be clearly documented. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spar
[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/17246#discussion_r106721109 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -373,11 +375,204 @@ The following configurations are optional: +## Producing Data to Kafka --- End diff -- I think section needs a big caveat regarding the lack of guarantees of Kafka writes - since Kafka does not support transactions, data may be duplicated due to failures and reexecutions --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/17246#discussion_r106720890 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -373,11 +375,204 @@ The following configurations are optional: +## Producing Data to Kafka + +### Writing Streaming Queries to Kafka + + + +{% highlight scala %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +val s1 = df1 + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .start() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +val s2 = df2 + .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .start() + +{% endhighlight %} + + +{% highlight java %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +StreamingQuery s1 = df1 + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .start() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +StreamingQuery s2 = df1 + .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .start() + +{% endhighlight %} + + +{% highlight python %} + +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option +s1 = df1 \ + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ + .writeStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("topic", "topic1") \ + .start() + +# Write key-value data from a DataFrame to Kafka using a topic specified in the data +s2 = df2 \ + .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \ + .writeStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .start() + +{% endhighlight %} + + + +### Writing Batch Queries to Kafka --- End diff -- Writing output of ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/17246#discussion_r106720832 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -373,11 +375,204 @@ The following configurations are optional: +## Producing Data to Kafka + +### Writing Streaming Queries to Kafka --- End diff -- Writing output of... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/17246#discussion_r106720658 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -373,11 +375,204 @@ The following configurations are optional: +## Producing Data to Kafka + +### Writing Streaming Queries to Kafka + + + +{% highlight scala %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +val s1 = df1 --- End diff -- nit: you can make all of the `df1` and `df2` just `df` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/17246#discussion_r106720722 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -373,11 +375,204 @@ The following configurations are optional: +## Producing Data to Kafka + +### Writing Streaming Queries to Kafka + + + +{% highlight scala %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +val s1 = df1 + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .start() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +val s2 = df2 + .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .start() + +{% endhighlight %} + + +{% highlight java %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +StreamingQuery s1 = df1 + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .start() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +StreamingQuery s2 = df1 --- End diff -- nit: did you mean df2? or just make all of them df? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/17246#discussion_r106720282 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -373,11 +375,204 @@ The following configurations are optional: +## Producing Data to Kafka + +### Writing Streaming Queries to Kafka + + + +{% highlight scala %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +val s1 = df1 + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .start() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +val s2 = df2 + .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .start() + +{% endhighlight %} + + +{% highlight java %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +StreamingQuery s1 = df1 + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .start() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +StreamingQuery s2 = df1 + .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .start() + +{% endhighlight %} + + +{% highlight python %} + +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option +s1 = df1 \ + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ + .writeStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("topic", "topic1") \ + .start() + +# Write key-value data from a DataFrame to Kafka using a topic specified in the data +s2 = df2 \ + .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \ + .writeStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .start() + +{% endhighlight %} + + + +### Writing Batch Queries to Kafka + + + +{% highlight scala %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .write + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .save() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .write + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .save() + +{% endhighlight %} + + +{% highlight java %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .write() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .save() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +df1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .write() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .save() + +{% endhighlight %} + + +{% highlight python %} + +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ + .write \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("topic", "topic1") \ + .save() --- End diff -- Its not. Kafka does not support transactions yet. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For addit
[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/17246#discussion_r105523893 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -373,11 +375,204 @@ The following configurations are optional: +## Producing Data to Kafka + +### Writing Streaming Queries to Kafka + + + +{% highlight scala %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +val s1 = df1 + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .start() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +val s2 = df2 + .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .start() + +{% endhighlight %} + + +{% highlight java %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +StreamingQuery s1 = df1 + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .start() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +StreamingQuery s2 = df1 + .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .start() + +{% endhighlight %} + + +{% highlight python %} + +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option +s1 = df1 \ + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ + .writeStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("topic", "topic1") \ + .start() + +# Write key-value data from a DataFrame to Kafka using a topic specified in the data +s2 = df2 \ + .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \ + .writeStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .start() + +{% endhighlight %} + + + +### Writing Batch Queries to Kafka + + + +{% highlight scala %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .write + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .save() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .write + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .save() + +{% endhighlight %} + + +{% highlight java %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .write() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .save() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +df1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .write() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .save() + +{% endhighlight %} + + +{% highlight python %} + +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ + .write \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("topic", "topic1") \ + .save() --- End diff -- is it save() atomic? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail:
[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...
GitHub user tcondie opened a pull request: https://github.com/apache/spark/pull/17246 [SPARK-19906][SS][DOCS] Documentation describing how to write queries to Kafka ## What changes were proposed in this pull request? Add documentation that describes how to write streaming and batch queries to Kafka. @zsxwing @tdas Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tcondie/spark kafka-write-docs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17246.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17246 commit 172d4505e5583c541e4644b1eeb12f853bf638cd Author: Tyson Condie Date: 2017-03-10T19:14:33Z update --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org