Repository: spark Updated Branches: refs/heads/master 1bb60ab83 -> 2512a1d42
[SPARK-26121][STRUCTURED STREAMING] Allow users to define prefix of Kafka's consumer group (group.id) ## What changes were proposed in this pull request? Allow the Spark Structured Streaming user to specify the prefix of the consumer group (group.id), compared to force consumer group ids of the form `spark-kafka-source-*` ## How was this patch tested? Unit tests provided by Spark (backwards compatible change, i.e., user can optionally use the functionality) `mvn test -pl external/kafka-0-10` Closes #23103 from zouzias/SPARK-26121. Authored-by: Anastasios Zouzias <anastas...@sqooba.io> Signed-off-by: cody koeninger <c...@koeninger.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2512a1d4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2512a1d4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2512a1d4 Branch: refs/heads/master Commit: 2512a1d42911370854ca42d987c851128fa0b263 Parents: 1bb60ab Author: Anastasios Zouzias <anastas...@sqooba.io> Authored: Mon Nov 26 11:10:38 2018 -0600 Committer: cody koeninger <c...@koeninger.org> Committed: Mon Nov 26 11:10:38 2018 -0600 ---------------------------------------------------------------------- docs/structured-streaming-kafka-integration.md | 37 ++++++++++++-------- .../sql/kafka010/KafkaSourceProvider.scala | 18 ++++++++-- 2 files changed, 38 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2512a1d4/docs/structured-streaming-kafka-integration.md ---------------------------------------------------------------------- diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 71fd5b1..a549ce2 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -123,7 +123,7 @@ df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") </div> </div> -### Creating a Kafka Source for Batch Queries +### Creating a Kafka Source for Batch Queries If you have a use case that is better suited to batch processing, you can create a Dataset/DataFrame for a defined range of offsets. @@ -374,17 +374,24 @@ The following configurations are optional: <td>streaming and batch</td> <td>Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.</td> </tr> +<tr> + <td>groupIdPrefix</td> + <td>string</td> + <td>spark-kafka-source</td> + <td>streaming and batch</td> + <td>Prefix of consumer group identifiers (`group.id`) that are generated by structured streaming queries</td> +</tr> </table> ## Writing Data to Kafka -Here, we describe the support for writing Streaming Queries and Batch Queries to Apache Kafka. Take note that +Here, we describe the support for writing Streaming Queries and Batch Queries to Apache Kafka. Take note that Apache Kafka only supports at least once write semantics. Consequently, when writing---either Streaming Queries or Batch Queries---to Kafka, some records may be duplicated; this can happen, for example, if Kafka needs to retry a message that was not acknowledged by a Broker, even though that Broker received and wrote the message record. -Structured Streaming cannot prevent such duplicates from occurring due to these Kafka write semantics. However, +Structured Streaming cannot prevent such duplicates from occurring due to these Kafka write semantics. However, if writing the query is successful, then you can assume that the query output was written at least once. A possible -solution to remove duplicates when reading the written data could be to introduce a primary (unique) key +solution to remove duplicates when reading the written data could be to introduce a primary (unique) key that can be used to perform de-duplication when reading. The Dataframe being written to Kafka should have the following columns in schema: @@ -405,8 +412,8 @@ The Dataframe being written to Kafka should have the following columns in schema </table> \* The topic column is required if the "topic" configuration option is not specified.<br> -The value column is the only required option. If a key column is not specified then -a ```null``` valued key column will be automatically added (see Kafka semantics on +The value column is the only required option. If a key column is not specified then +a ```null``` valued key column will be automatically added (see Kafka semantics on how ```null``` valued key values are handled). If a topic column exists then its value is used as the topic when writing the given row to Kafka, unless the "topic" configuration option is set i.e., the "topic" configuration option overrides the topic column. @@ -568,7 +575,7 @@ df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \ .format("kafka") \ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ .save() - + {% endhighlight %} </div> </div> @@ -576,23 +583,25 @@ df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \ ## Kafka Specific Configurations -Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g, -`stream.option("kafka.bootstrap.servers", "host:port")`. For possible kafka parameters, see +Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g, +`stream.option("kafka.bootstrap.servers", "host:port")`. For possible kafka parameters, see [Kafka consumer config docs](http://kafka.apache.org/documentation.html#newconsumerconfigs) for parameters related to reading data, and [Kafka producer config docs](http://kafka.apache.org/documentation/#producerconfigs) for parameters related to writing data. Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception: -- **group.id**: Kafka source will create a unique group id for each query automatically. +- **group.id**: Kafka source will create a unique group id for each query automatically. The user can +set the prefix of the automatically generated group.id's via the optional source option `groupIdPrefix`, default value +is "spark-kafka-source". - **auto.offset.reset**: Set the source option `startingOffsets` to specify - where to start instead. Structured Streaming manages which offsets are consumed internally, rather - than rely on the kafka Consumer to do it. This will ensure that no data is missed when new + where to start instead. Structured Streaming manages which offsets are consumed internally, rather + than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that `startingOffsets` only applies when a new streaming query is started, and that resuming will always pick up from where the query left off. -- **key.deserializer**: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use +- **key.deserializer**: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the keys. -- **value.deserializer**: Values are always deserialized as byte arrays with ByteArrayDeserializer. +- **value.deserializer**: Values are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the values. - **key.serializer**: Keys are always serialized with ByteArraySerializer or StringSerializer. Use DataFrame operations to explicitly serialize the keys into either strings or byte arrays. http://git-wip-us.apache.org/repos/asf/spark/blob/2512a1d4/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 5034bd7..f770f0c 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -77,7 +77,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister // Each running query should use its own group id. Otherwise, the query may be only assigned // partial data since Kafka will assign partitions to multiple consumers having the same group // id. Hence, we should generate a unique id for each query. - val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}" + val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath) val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } val specifiedKafkaParams = @@ -119,7 +119,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister // Each running query should use its own group id. Otherwise, the query may be only assigned // partial data since Kafka will assign partitions to multiple consumers having the same group // id. Hence, we should generate a unique id for each query. - val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}" + val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath) val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } val specifiedKafkaParams = @@ -159,7 +159,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister // Each running query should use its own group id. Otherwise, the query may be only assigned // partial data since Kafka will assign partitions to multiple consumers having the same group // id. Hence, we should generate a unique id for each query. - val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}" + val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath) val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } val specifiedKafkaParams = @@ -538,6 +538,18 @@ private[kafka010] object KafkaSourceProvider extends Logging { .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer) .build() + /** + * Returns a unique consumer group (group.id), allowing the user to set the prefix of + * the consumer group + */ + private def streamingUniqueGroupId( + parameters: Map[String, String], + metadataPath: String): String = { + val groupIdPrefix = parameters + .getOrElse("groupIdPrefix", "spark-kafka-source") + s"${groupIdPrefix}-${UUID.randomUUID}-${metadataPath.hashCode}" + } + /** Class to conveniently update Kafka config params, while logging the changes */ private case class ConfigUpdater(module: String, kafkaParams: Map[String, String]) { private val map = new ju.HashMap[String, Object](kafkaParams.asJava) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org