Sethupathi,

Let me extract then the important part what I've shared:

1. "This ensures that each Kafka source has its own consumer group that
does not face interference from any other consumer"
2. Consumers may eat the data from each other, offset calculation may give
back wrong result (that's the reason why "extreme caution" is recommended
in Structured Streaming doc which still applies here)
3. yes

BR,
G


On Thu, Sep 5, 2019 at 8:34 PM Sethupathi T <sethu....@googlemail.com>
wrote:

> Gabor,
>
> Thanks for the quick response and sharing about spark 3.0,  we need to use
> spark streaming (KafkaUtils.createDirectStream) than structured streaming
> by following this document
> https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html and
> re-iterating the issue again for better understanding.
> spark-streaming-kafka-0-10
> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html> 
> kafka
> connector prefix "spark-executor" + group.id for executors, driver uses
> original group id.
>
> *Here is the code where executor construct executor specific group id *
>
>
> https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
>  line # 212,
>
>
> *It would be great if you could provide the explanation to the following
> questions.*
>
> #1 What was the specific reason for prefixing group id in executor ?
>
> #2 Will it be harmful if i build the custom spark-streaming-kafka-0-10
> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
>   library by removing the group id prefix? at line # 212 in
> KafkaUtils.scala
> #3 KafkaUtils.scala is marked as @Experimental what does it mean? is it
> advisable to use in production?
>
> *Here is the my spark streaming code snippet*
>
> val kafkaParams = Map[String, Object](
>
>   ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
>
>   ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
>
>   ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
>
>   ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
>
>   ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
>
>   ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> 
> classOf[MessageDeserializer],
>
>   "security.protocol" -> "SSL",
>
>   "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
>
>   "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
>
>   "ssl.keystore.location" -> Constants.KEYSTORE_PATH,
>
>   "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
>
>   "ssl.key.password" -> Constants.KEYSTORE_PASSWORD
>
> )
>
> val stream = KafkaUtils.createDirectStream[String, Message](
>
>   ssc,
>
>   PreferConsistent,
>
>   Subscribe[String, Message](topicsSet, kafkaParams)
>
> )
>
> ---
> Thanks in Advance,
> Sethupathi.T
>
>
> On Thu, Sep 5, 2019 at 9:17 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Let me share Spark 3.0 documentation part (Structured Streaming and not
>> DStreams what you've mentioned but still relevant):
>>
>> kafka.group.id string none streaming and batch The Kafka group id to use
>> in Kafka consumer while reading from Kafka. Use this with caution. By
>> default, each query generates a unique group id for reading data. This
>> ensures that each Kafka source has its own consumer group that does not
>> face interference from any other consumer, and therefore can read all of
>> the partitions of its subscribed topics. In some scenarios (for example,
>> Kafka group-based authorization), you may want to use a specific authorized
>> group id to read data. You can optionally set the group id. However, do
>> this with extreme caution as it can cause unexpected behavior. Concurrently
>> running queries (both, batch and streaming) or sources with the same group
>> id are likely interfere with each other causing each query to read only
>> part of the data. This may also occur when queries are started/restarted in
>> quick succession. To minimize such issues, set the Kafka consumer session
>> timeout (by setting option "kafka.session.timeout.ms") to be very small.
>> When this is set, option "groupIdPrefix" will be ignored.
>> I think it answers your questions.
>>
>> As a general suggestion maybe it worth to revisit Spark 3.0 because
>> Structured Streaming has another interesting feature:
>> groupIdPrefix string spark-kafka-source streaming and batch Prefix of
>> consumer group identifiers (`group.id`) that are generated by structured
>> streaming queries. If "kafka.group.id" is set, this option will be
>> ignored.
>>
>> BR,
>> G
>>
>>
>> On Thu, Sep 5, 2019 at 10:05 AM Sethupathi T
>> <sethu....@googlemail.com.invalid> wrote:
>>
>>> Hi Team,
>>>
>>> We have secured Kafka cluster (which only allows to consume from the
>>> pre-configured, authorized consumer group), there is a scenario where we
>>> want to use spark streaming to consume from secured kafka. so we have
>>> decided to use spark-streaming-kafka-0-10
>>> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
>>>  (it
>>> supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i
>>> deploy the application in cluster mode, i realized that the actual group id
>>> has been prefixed with "spark-executor" in executor configuration (executor
>>> as trying to connect to kafka with "spark-executor" + actual group id,
>>> which is not really exists and getting exception).
>>>
>>> *Here is the code where executor construct executor specific group id *
>>>
>>>
>>> https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
>>>  line
>>> # 212,
>>>
>>> *Here are my Questions*
>>>
>>> #1 What was the specific reason for prefixing group id in executor ?
>>>
>>> #2 Will it be harmful if i build the custom spark-streaming-kafka-0-10
>>> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
>>> library by removing the group id prefix?
>>>
>>> #3 KafkaUtils.scala is marked as @Experimental what does it mean? is it
>>> advisable to use in production?
>>>
>>> *Here is the my spark streaming code snippet*
>>>
>>> val kafkaParams = Map[String, Object](
>>>
>>>   ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
>>>   ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
>>>   ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
>>>   ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
>>>   ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> 
>>> classOf[StringDeserializer],
>>>   ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> 
>>> classOf[MessageDeserializer],
>>>   "security.protocol" -> "SSL",
>>>   "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
>>>   "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
>>>   "ssl.keystore.location" -> Constants.KEYSTORE_PATH,
>>>   "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
>>>   "ssl.key.password" -> Constants.KEYSTORE_PASSWORD
>>> )
>>>
>>> val stream = KafkaUtils.createDirectStream[String, Message](
>>>   ssc,
>>>   PreferConsistent,
>>>   Subscribe[String, Message](topicsSet, kafkaParams)
>>> )
>>>
>>> ---
>>> Thanks in Advance,
>>> Sethupathi.T
>>>
>>

Reply via email to