[ https://issues.apache.org/jira/browse/SPARK-31805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Arseniy Tashoyan updated SPARK-31805: ------------------------------------- Description: [Spark Structured Streaming - Kafka integration|https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries] provides the [assign|https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#assign-java.util.Collection-] strategy to consume data from Kafka. This strategy assumes manual assignment of offsets in topic partitions. According to KafkaConsumer specification, the consumer group in the "assign" strategy is not used. When creating a consumer to read data, Spark provides an internally-generated group id like this: [KafkaRelation|https://github.com/apache/spark/blob/bcadd5c3096109878fe26fb0d57a9b7d6fdaa257/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala#L60]: {code:scala} val uniqueGroupId = s"spark-kafka-relation-${UUID.randomUUID}" {code} This is done for any consumer strategy, even for "assign". The problem is that with a secured Kafka cluster a client cannot use an arbitrary consumer group id. That's why a Structured Streaming application fails with an exception like: {code:none} org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-relation-ecab045d-4ee6-425e-88a0-495d4100a013-driver-0 {code} With Spark 2.4.5, the only way is to reconfigure the broker - add the needed entries in ACL (for example, [this discussion|https://stackoverflow.com/a/59300360] on StackOverflow). With Spark 3.0.0, this problem could be avoided by two workarounds: - SPARK-26121: Specify a custom prefix for the consumer group id generated by Spark (allowed by the broker) - SPARK-26350: Specify a custom group id However, with the "assign" strategy the user does not need to worry about consumer group - the consumer group should be disregarded. Therefore a better fix could be to not set the consumer property "group.id" when using the "assign" strategy. was: [Spark Structured Streaming - Kafka integration|https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries] provides the [assign|https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#assign-java.util.Collection-] strategy to consume data from Kafka. This strategy assumes manual assignment of offsets in topic partitions. According to KafkaConsumer specification, the consumer group in the "assign" strategy is not used. When creating a consumer to read data, Spark provides an internally-generated group id like this: [KafkaRelation|https://github.com/apache/spark/blob/bcadd5c3096109878fe26fb0d57a9b7d6fdaa257/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala#L60]: {code:scala} val uniqueGroupId = s"spark-kafka-relation-${UUID.randomUUID}" {code} This is done for any consumer strategy, even for "assign". The problem is that with a secured Kafka cluster a client cannot use an arbitrary consumer group id. That's why a Structured Streaming application fails with an exception like: {code:none} org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-relation-ecab045d-4ee6-425e-88a0-495d4100a013-driver-0 {code} In Spark 3.0.0, this problem could be avoided by two workarounds: - SPARK-26121: Specify a custom prefix for the consumer group id generated by Spark (allowed by the broker) - SPARK-26350: Specify a custom group id However, with the "assign" strategy the user does not need to worry about consumer group - the consumer group should be disregarded. Therefore a better fix could be to not set the consumer property "group.id" when using the "assign" strategy. > Spark Structured Streaming with "assign" KafkaConsumer mode still uses > group.id > ------------------------------------------------------------------------------- > > Key: SPARK-31805 > URL: https://issues.apache.org/jira/browse/SPARK-31805 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.4.5, 3.0.0 > Reporter: Arseniy Tashoyan > Priority: Major > > [Spark Structured Streaming - Kafka > integration|https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries] > provides the > [assign|https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#assign-java.util.Collection-] > strategy to consume data from Kafka. This strategy assumes manual assignment > of offsets in topic partitions. According to KafkaConsumer specification, the > consumer group in the "assign" strategy is not used. > When creating a consumer to read data, Spark provides an internally-generated > group id like this: > [KafkaRelation|https://github.com/apache/spark/blob/bcadd5c3096109878fe26fb0d57a9b7d6fdaa257/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala#L60]: > {code:scala} > val uniqueGroupId = s"spark-kafka-relation-${UUID.randomUUID}" > {code} > This is done for any consumer strategy, even for "assign". The problem is > that with a secured Kafka cluster a client cannot use an arbitrary consumer > group id. That's why a Structured Streaming application fails with an > exception like: > {code:none} > org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to > access group: > spark-kafka-relation-ecab045d-4ee6-425e-88a0-495d4100a013-driver-0 > {code} > With Spark 2.4.5, the only way is to reconfigure the broker - add the needed > entries in ACL (for example, [this > discussion|https://stackoverflow.com/a/59300360] on StackOverflow). > > With Spark 3.0.0, this problem could be avoided by two workarounds: > - SPARK-26121: Specify a custom prefix for the consumer group id generated by > Spark (allowed by the broker) > - SPARK-26350: Specify a custom group id > However, with the "assign" strategy the user does not need to worry about > consumer group - the consumer group should be disregarded. Therefore a better > fix could be to not set the consumer property "group.id" when using the > "assign" strategy. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org