Gabor,

Thanks for the clarification.

Thanks

On Fri, Sep 6, 2019 at 12:38 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> Sethupathi,
>
> Let me extract then the important part what I've shared:
>
> 1. "This ensures that each Kafka source has its own consumer group that
> does not face interference from any other consumer"
> 2. Consumers may eat the data from each other, offset calculation may give
> back wrong result (that's the reason why "extreme caution" is recommended
> in Structured Streaming doc which still applies here)
> 3. yes
>
> BR,
> G
>
>
> On Thu, Sep 5, 2019 at 8:34 PM Sethupathi T <sethu....@googlemail.com>
> wrote:
>
>> Gabor,
>>
>> Thanks for the quick response and sharing about spark 3.0,  we need to
>> use spark streaming (KafkaUtils.createDirectStream) than structured
>> streaming by following this document
>> https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html and
>> re-iterating the issue again for better understanding.
>> spark-streaming-kafka-0-10
>> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html> 
>> kafka
>> connector prefix "spark-executor" + group.id for executors, driver uses
>> original group id.
>>
>> *Here is the code where executor construct executor specific group id *
>>
>>
>> https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
>>  line # 212,
>>
>>
>> *It would be great if you could provide the explanation to the following
>> questions.*
>>
>> #1 What was the specific reason for prefixing group id in executor ?
>>
>> #2 Will it be harmful if i build the custom spark-streaming-kafka-0-10
>> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
>>   library by removing the group id prefix? at line # 212 in
>> KafkaUtils.scala
>> #3 KafkaUtils.scala is marked as @Experimental what does it mean? is it
>> advisable to use in production?
>>
>> *Here is the my spark streaming code snippet*
>>
>> val kafkaParams = Map[String, Object](
>>
>>   ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
>>
>>   ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
>>
>>   ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
>>
>>   ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
>>
>>   ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> 
>> classOf[StringDeserializer],
>>
>>   ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> 
>> classOf[MessageDeserializer],
>>
>>   "security.protocol" -> "SSL",
>>
>>   "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
>>
>>   "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
>>
>>   "ssl.keystore.location" -> Constants.KEYSTORE_PATH,
>>
>>   "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
>>
>>   "ssl.key.password" -> Constants.KEYSTORE_PASSWORD
>>
>> )
>>
>> val stream = KafkaUtils.createDirectStream[String, Message](
>>
>>   ssc,
>>
>>   PreferConsistent,
>>
>>   Subscribe[String, Message](topicsSet, kafkaParams)
>>
>> )
>>
>> ---
>> Thanks in Advance,
>> Sethupathi.T
>>
>>
>> On Thu, Sep 5, 2019 at 9:17 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Let me share Spark 3.0 documentation part (Structured Streaming and not
>>> DStreams what you've mentioned but still relevant):
>>>
>>> kafka.group.id string none streaming and batch The Kafka group id to
>>> use in Kafka consumer while reading from Kafka. Use this with caution. By
>>> default, each query generates a unique group id for reading data. This
>>> ensures that each Kafka source has its own consumer group that does not
>>> face interference from any other consumer, and therefore can read all of
>>> the partitions of its subscribed topics. In some scenarios (for example,
>>> Kafka group-based authorization), you may want to use a specific authorized
>>> group id to read data. You can optionally set the group id. However, do
>>> this with extreme caution as it can cause unexpected behavior. Concurrently
>>> running queries (both, batch and streaming) or sources with the same group
>>> id are likely interfere with each other causing each query to read only
>>> part of the data. This may also occur when queries are started/restarted in
>>> quick succession. To minimize such issues, set the Kafka consumer session
>>> timeout (by setting option "kafka.session.timeout.ms") to be very
>>> small. When this is set, option "groupIdPrefix" will be ignored.
>>> I think it answers your questions.
>>>
>>> As a general suggestion maybe it worth to revisit Spark 3.0 because
>>> Structured Streaming has another interesting feature:
>>> groupIdPrefix string spark-kafka-source streaming and batch Prefix of
>>> consumer group identifiers (`group.id`) that are generated by
>>> structured streaming queries. If "kafka.group.id" is set, this option
>>> will be ignored.
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Thu, Sep 5, 2019 at 10:05 AM Sethupathi T
>>> <sethu....@googlemail.com.invalid> wrote:
>>>
>>>> Hi Team,
>>>>
>>>> We have secured Kafka cluster (which only allows to consume from the
>>>> pre-configured, authorized consumer group), there is a scenario where we
>>>> want to use spark streaming to consume from secured kafka. so we have
>>>> decided to use spark-streaming-kafka-0-10
>>>> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
>>>>  (it
>>>> supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i
>>>> deploy the application in cluster mode, i realized that the actual group id
>>>> has been prefixed with "spark-executor" in executor configuration (executor
>>>> as trying to connect to kafka with "spark-executor" + actual group id,
>>>> which is not really exists and getting exception).
>>>>
>>>> *Here is the code where executor construct executor specific group id *
>>>>
>>>>
>>>> https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
>>>>  line
>>>> # 212,
>>>>
>>>> *Here are my Questions*
>>>>
>>>> #1 What was the specific reason for prefixing group id in executor ?
>>>>
>>>> #2 Will it be harmful if i build the custom spark-streaming-kafka-0-10
>>>> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
>>>> library by removing the group id prefix?
>>>>
>>>> #3 KafkaUtils.scala is marked as @Experimental what does it mean? is
>>>> it advisable to use in production?
>>>>
>>>> *Here is the my spark streaming code snippet*
>>>>
>>>> val kafkaParams = Map[String, Object](
>>>>
>>>>   ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
>>>>   ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
>>>>   ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
>>>>   ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
>>>>   ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> 
>>>> classOf[StringDeserializer],
>>>>   ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> 
>>>> classOf[MessageDeserializer],
>>>>   "security.protocol" -> "SSL",
>>>>   "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
>>>>   "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
>>>>   "ssl.keystore.location" -> Constants.KEYSTORE_PATH,
>>>>   "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
>>>>   "ssl.key.password" -> Constants.KEYSTORE_PASSWORD
>>>> )
>>>>
>>>> val stream = KafkaUtils.createDirectStream[String, Message](
>>>>   ssc,
>>>>   PreferConsistent,
>>>>   Subscribe[String, Message](topicsSet, kafkaParams)
>>>> )
>>>>
>>>> ---
>>>> Thanks in Advance,
>>>> Sethupathi.T
>>>>
>>>

Reply via email to