[ 
https://issues.apache.org/jira/browse/SPARK-31805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arseniy Tashoyan updated SPARK-31805:
-------------------------------------
    Description: 
[Spark Structured Streaming - Kafka 
integration|https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries]
 provides the 
[assign|https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#assign-java.util.Collection-]
 strategy to consume data from Kafka. This strategy assumes manual assignment 
of offsets in topic partitions. According to KafkaConsumer specification, the 
consumer group in the "assign" strategy is not used.

When creating a consumer to read data, Spark provides an internally-generated 
group id like this:

[KafkaRelation|https://github.com/apache/spark/blob/bcadd5c3096109878fe26fb0d57a9b7d6fdaa257/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala#L60]:
{code:scala}
val uniqueGroupId = s"spark-kafka-relation-${UUID.randomUUID}"
{code}

This is done for any consumer strategy, even for "assign". The problem is that 
with a secured Kafka cluster a client cannot use an arbitrary consumer group 
id. That's why a Structured Streaming application fails with an exception like:

{code:none}
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to 
access group: spark-kafka-relation-ecab045d-4ee6-425e-88a0-495d4100a013-driver-0
{code}

With Spark 2.4.5, the only way is to reconfigure the broker - add the needed 
entries in ACL (for example, [this 
discussion|https://stackoverflow.com/a/59300360] on StackOverflow).
 
With Spark 3.0.0, this problem could be avoided by two workarounds:
- SPARK-26121: Specify a custom prefix for the consumer group id generated by 
Spark (allowed by the broker)
- SPARK-26350: Specify a custom group id

However, with the "assign" strategy the user does not need to worry about 
consumer group - the consumer group should be disregarded. Therefore a better 
fix could be to not set the consumer property "group.id" when using the 
"assign" strategy.



  was:
[Spark Structured Streaming - Kafka 
integration|https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries]
 provides the 
[assign|https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#assign-java.util.Collection-]
 strategy to consume data from Kafka. This strategy assumes manual assignment 
of offsets in topic partitions. According to KafkaConsumer specification, the 
consumer group in the "assign" strategy is not used.

When creating a consumer to read data, Spark provides an internally-generated 
group id like this:

[KafkaRelation|https://github.com/apache/spark/blob/bcadd5c3096109878fe26fb0d57a9b7d6fdaa257/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala#L60]:
{code:scala}
val uniqueGroupId = s"spark-kafka-relation-${UUID.randomUUID}"
{code}

This is done for any consumer strategy, even for "assign". The problem is that 
with a secured Kafka cluster a client cannot use an arbitrary consumer group 
id. That's why a Structured Streaming application fails with an exception like:

{code:none}
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to 
access group: spark-kafka-relation-ecab045d-4ee6-425e-88a0-495d4100a013-driver-0
{code}
 
In Spark 3.0.0, this problem could be avoided by two workarounds:
- SPARK-26121: Specify a custom prefix for the consumer group id generated by 
Spark (allowed by the broker)
- SPARK-26350: Specify a custom group id

However, with the "assign" strategy the user does not need to worry about 
consumer group - the consumer group should be disregarded. Therefore a better 
fix could be to not set the consumer property "group.id" when using the 
"assign" strategy.




> Spark Structured Streaming with "assign" KafkaConsumer mode still uses 
> group.id
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-31805
>                 URL: https://issues.apache.org/jira/browse/SPARK-31805
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.5, 3.0.0
>            Reporter: Arseniy Tashoyan
>            Priority: Major
>
> [Spark Structured Streaming - Kafka 
> integration|https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries]
>  provides the 
> [assign|https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#assign-java.util.Collection-]
>  strategy to consume data from Kafka. This strategy assumes manual assignment 
> of offsets in topic partitions. According to KafkaConsumer specification, the 
> consumer group in the "assign" strategy is not used.
> When creating a consumer to read data, Spark provides an internally-generated 
> group id like this:
> [KafkaRelation|https://github.com/apache/spark/blob/bcadd5c3096109878fe26fb0d57a9b7d6fdaa257/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala#L60]:
> {code:scala}
> val uniqueGroupId = s"spark-kafka-relation-${UUID.randomUUID}"
> {code}
> This is done for any consumer strategy, even for "assign". The problem is 
> that with a secured Kafka cluster a client cannot use an arbitrary consumer 
> group id. That's why a Structured Streaming application fails with an 
> exception like:
> {code:none}
> org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to 
> access group: 
> spark-kafka-relation-ecab045d-4ee6-425e-88a0-495d4100a013-driver-0
> {code}
> With Spark 2.4.5, the only way is to reconfigure the broker - add the needed 
> entries in ACL (for example, [this 
> discussion|https://stackoverflow.com/a/59300360] on StackOverflow).
>  
> With Spark 3.0.0, this problem could be avoided by two workarounds:
> - SPARK-26121: Specify a custom prefix for the consumer group id generated by 
> Spark (allowed by the broker)
> - SPARK-26350: Specify a custom group id
> However, with the "assign" strategy the user does not need to worry about 
> consumer group - the consumer group should be disregarded. Therefore a better 
> fix could be to not set the consumer property "group.id" when using the 
> "assign" strategy.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to