Repository: spark
Updated Branches:
  refs/heads/master 1bb60ab83 -> 2512a1d42


[SPARK-26121][STRUCTURED STREAMING] Allow users to define prefix of Kafka's 
consumer group (group.id)

## What changes were proposed in this pull request?

Allow the Spark Structured Streaming user to specify the prefix of the consumer 
group (group.id), compared to force consumer group ids of the form 
`spark-kafka-source-*`

## How was this patch tested?

Unit tests provided by Spark (backwards compatible change, i.e., user can 
optionally use the functionality)

`mvn test -pl external/kafka-0-10`

Closes #23103 from zouzias/SPARK-26121.

Authored-by: Anastasios Zouzias <anastas...@sqooba.io>
Signed-off-by: cody koeninger <c...@koeninger.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2512a1d4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2512a1d4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2512a1d4

Branch: refs/heads/master
Commit: 2512a1d42911370854ca42d987c851128fa0b263
Parents: 1bb60ab
Author: Anastasios Zouzias <anastas...@sqooba.io>
Authored: Mon Nov 26 11:10:38 2018 -0600
Committer: cody koeninger <c...@koeninger.org>
Committed: Mon Nov 26 11:10:38 2018 -0600

----------------------------------------------------------------------
 docs/structured-streaming-kafka-integration.md  | 37 ++++++++++++--------
 .../sql/kafka010/KafkaSourceProvider.scala      | 18 ++++++++--
 2 files changed, 38 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2512a1d4/docs/structured-streaming-kafka-integration.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-kafka-integration.md 
b/docs/structured-streaming-kafka-integration.md
index 71fd5b1..a549ce2 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -123,7 +123,7 @@ df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)")
 </div>
 </div>
 
-### Creating a Kafka Source for Batch Queries 
+### Creating a Kafka Source for Batch Queries
 If you have a use case that is better suited to batch processing,
 you can create a Dataset/DataFrame for a defined range of offsets.
 
@@ -374,17 +374,24 @@ The following configurations are optional:
   <td>streaming and batch</td>
   <td>Rate limit on maximum number of offsets processed per trigger interval. 
The specified total number of offsets will be proportionally split across 
topicPartitions of different volume.</td>
 </tr>
+<tr>
+  <td>groupIdPrefix</td>
+  <td>string</td>
+  <td>spark-kafka-source</td>
+  <td>streaming and batch</td>
+  <td>Prefix of consumer group identifiers (`group.id`) that are generated by 
structured streaming queries</td>
+</tr>
 </table>
 
 ## Writing Data to Kafka
 
-Here, we describe the support for writing Streaming Queries and Batch Queries 
to Apache Kafka. Take note that 
+Here, we describe the support for writing Streaming Queries and Batch Queries 
to Apache Kafka. Take note that
 Apache Kafka only supports at least once write semantics. Consequently, when 
writing---either Streaming Queries
 or Batch Queries---to Kafka, some records may be duplicated; this can happen, 
for example, if Kafka needs
 to retry a message that was not acknowledged by a Broker, even though that 
Broker received and wrote the message record.
-Structured Streaming cannot prevent such duplicates from occurring due to 
these Kafka write semantics. However, 
+Structured Streaming cannot prevent such duplicates from occurring due to 
these Kafka write semantics. However,
 if writing the query is successful, then you can assume that the query output 
was written at least once. A possible
-solution to remove duplicates when reading the written data could be to 
introduce a primary (unique) key 
+solution to remove duplicates when reading the written data could be to 
introduce a primary (unique) key
 that can be used to perform de-duplication when reading.
 
 The Dataframe being written to Kafka should have the following columns in 
schema:
@@ -405,8 +412,8 @@ The Dataframe being written to Kafka should have the 
following columns in schema
 </table>
 \* The topic column is required if the "topic" configuration option is not 
specified.<br>
 
-The value column is the only required option. If a key column is not specified 
then 
-a ```null``` valued key column will be automatically added (see Kafka 
semantics on 
+The value column is the only required option. If a key column is not specified 
then
+a ```null``` valued key column will be automatically added (see Kafka 
semantics on
 how ```null``` valued key values are handled). If a topic column exists then 
its value
 is used as the topic when writing the given row to Kafka, unless the "topic" 
configuration
 option is set i.e., the "topic" configuration option overrides the topic 
column.
@@ -568,7 +575,7 @@ df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value 
AS STRING)") \
   .format("kafka") \
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
   .save()
-  
+
 {% endhighlight %}
 </div>
 </div>
@@ -576,23 +583,25 @@ df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value 
AS STRING)") \
 
 ## Kafka Specific Configurations
 
-Kafka's own configurations can be set via `DataStreamReader.option` with 
`kafka.` prefix, e.g, 
-`stream.option("kafka.bootstrap.servers", "host:port")`. For possible kafka 
parameters, see 
+Kafka's own configurations can be set via `DataStreamReader.option` with 
`kafka.` prefix, e.g,
+`stream.option("kafka.bootstrap.servers", "host:port")`. For possible kafka 
parameters, see
 [Kafka consumer config 
docs](http://kafka.apache.org/documentation.html#newconsumerconfigs) for
 parameters related to reading data, and [Kafka producer config 
docs](http://kafka.apache.org/documentation/#producerconfigs)
 for parameters related to writing data.
 
 Note that the following Kafka params cannot be set and the Kafka source or 
sink will throw an exception:
 
-- **group.id**: Kafka source will create a unique group id for each query 
automatically.
+- **group.id**: Kafka source will create a unique group id for each query 
automatically. The user can
+set the prefix of the automatically generated group.id's via the optional 
source option `groupIdPrefix`, default value
+is "spark-kafka-source".
 - **auto.offset.reset**: Set the source option `startingOffsets` to specify
- where to start instead. Structured Streaming manages which offsets are 
consumed internally, rather 
- than rely on the kafka Consumer to do it. This will ensure that no data is 
missed when new 
+ where to start instead. Structured Streaming manages which offsets are 
consumed internally, rather
+ than rely on the kafka Consumer to do it. This will ensure that no data is 
missed when new
  topics/partitions are dynamically subscribed. Note that `startingOffsets` 
only applies when a new
  streaming query is started, and that resuming will always pick up from where 
the query left off.
-- **key.deserializer**: Keys are always deserialized as byte arrays with 
ByteArrayDeserializer. Use 
+- **key.deserializer**: Keys are always deserialized as byte arrays with 
ByteArrayDeserializer. Use
  DataFrame operations to explicitly deserialize the keys.
-- **value.deserializer**: Values are always deserialized as byte arrays with 
ByteArrayDeserializer. 
+- **value.deserializer**: Values are always deserialized as byte arrays with 
ByteArrayDeserializer.
  Use DataFrame operations to explicitly deserialize the values.
 - **key.serializer**: Keys are always serialized with ByteArraySerializer or 
StringSerializer. Use
 DataFrame operations to explicitly serialize the keys into either strings or 
byte arrays.

http://git-wip-us.apache.org/repos/asf/spark/blob/2512a1d4/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 5034bd7..f770f0c 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -77,7 +77,7 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
     // Each running query should use its own group id. Otherwise, the query 
may be only assigned
     // partial data since Kafka will assign partitions to multiple consumers 
having the same group
     // id. Hence, we should generate a unique id for each query.
-    val uniqueGroupId = 
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
+    val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath)
 
     val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
     val specifiedKafkaParams =
@@ -119,7 +119,7 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
     // Each running query should use its own group id. Otherwise, the query 
may be only assigned
     // partial data since Kafka will assign partitions to multiple consumers 
having the same group
     // id. Hence, we should generate a unique id for each query.
-    val uniqueGroupId = 
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
+    val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath)
 
     val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
     val specifiedKafkaParams =
@@ -159,7 +159,7 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
     // Each running query should use its own group id. Otherwise, the query 
may be only assigned
     // partial data since Kafka will assign partitions to multiple consumers 
having the same group
     // id. Hence, we should generate a unique id for each query.
-    val uniqueGroupId = 
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
+    val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath)
 
     val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
     val specifiedKafkaParams =
@@ -538,6 +538,18 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
       .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
       .build()
 
+  /**
+   * Returns a unique consumer group (group.id), allowing the user to set the 
prefix of
+   * the consumer group
+   */
+  private def streamingUniqueGroupId(
+      parameters: Map[String, String],
+      metadataPath: String): String = {
+    val groupIdPrefix = parameters
+      .getOrElse("groupIdPrefix", "spark-kafka-source")
+    s"${groupIdPrefix}-${UUID.randomUUID}-${metadataPath.hashCode}"
+  }
+
   /** Class to conveniently update Kafka config params, while logging the 
changes */
   private case class ConfigUpdater(module: String, kafkaParams: Map[String, 
String]) {
     private val map = new ju.HashMap[String, Object](kafkaParams.asJava)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to