Re: Problem with Kafka group.id

2020-03-23 Thread Sethupathi T
I had exact same issue, the temp fix what I did was, took open source code
from github, modified the group.id mandatory logic and built customized
library.

Thanks,

On Tue, Mar 17, 2020 at 7:34 AM Sjoerd van Leent <
sjoerd.van.le...@alliander.com> wrote:

> Dear reader,
>
>
>
> I must force the group.id of Kafka, as Kafka is under ACL control,
> however, doing so gives me the error:
>
>
>
> Kafka option 'group.id' is not supported as user-specified consumer
> groups are not used to track offsets.
>
>
>
> This won’t work, as not being able to set it, basically disqualifies using
> Spark within our organization. How can I force (Py)Spark to respect the
> group.id used?
>
>
>
> Met vriendelijke groet,
>
>
>
> *Sjoerd van Leent*
>
> Systeem Engineer | IT AST-B CSC
>
>
>
> *M   *+31 6 11 24 52 27
> *E *   sjoerd.van.le...@alliander.com
>
>
> *Alliander N.V.  *.  Postbus 50, 6920 AB Duiven, Nederland  .
> Locatiecode: 2PB2100  .  Utrechtseweg 68, 6812 AH Arnhem
> 
>  .  KvK 09104351 Arnhem  .  *www.alliander.com
>  *
>
>
>
> De inhoud van deze e-mail, inclusief bijlagen, is persoonlijk en
> vertrouwelijk. Mocht dit bericht niet voor u bedoeld zijn, informeer dan
> per omgaande de afzender en verwijder dit bericht. Gelieve deze e-mail,
> inclusief eventuele bijlagen, niet te gebruiken, kopiëren of door te sturen
> aan derden.
>
>
>
>
>


unsubscribe

2020-01-17 Thread Sethupathi T



unsubscribe

2019-12-19 Thread Sethupathi T
unsubscribe


Re: [Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

2019-09-06 Thread Sethupathi T
Gabor,

Thanks for the clarification.

Thanks

On Fri, Sep 6, 2019 at 12:38 AM Gabor Somogyi 
wrote:

> Sethupathi,
>
> Let me extract then the important part what I've shared:
>
> 1. "This ensures that each Kafka source has its own consumer group that
> does not face interference from any other consumer"
> 2. Consumers may eat the data from each other, offset calculation may give
> back wrong result (that's the reason why "extreme caution" is recommended
> in Structured Streaming doc which still applies here)
> 3. yes
>
> BR,
> G
>
>
> On Thu, Sep 5, 2019 at 8:34 PM Sethupathi T 
> wrote:
>
>> Gabor,
>>
>> Thanks for the quick response and sharing about spark 3.0,  we need to
>> use spark streaming (KafkaUtils.createDirectStream) than structured
>> streaming by following this document
>> https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html and
>> re-iterating the issue again for better understanding.
>> spark-streaming-kafka-0-10
>> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html> 
>> kafka
>> connector prefix "spark-executor" + group.id for executors, driver uses
>> original group id.
>>
>> *Here is the code where executor construct executor specific group id *
>>
>>
>> https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
>>  line # 212,
>>
>>
>> *It would be great if you could provide the explanation to the following
>> questions.*
>>
>> #1 What was the specific reason for prefixing group id in executor ?
>>
>> #2 Will it be harmful if i build the custom spark-streaming-kafka-0-10
>> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
>>   library by removing the group id prefix? at line # 212 in
>> KafkaUtils.scala
>> #3 KafkaUtils.scala is marked as @Experimental what does it mean? is it
>> advisable to use in production?
>>
>> *Here is the my spark streaming code snippet*
>>
>> val kafkaParams = Map[String, Object](
>>
>>   ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
>>
>>   ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
>>
>>   ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
>>
>>   ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
>>
>>   ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> 
>> classOf[StringDeserializer],
>>
>>   ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> 
>> classOf[MessageDeserializer],
>>
>>   "security.protocol" -> "SSL",
>>
>>   "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
>>
>>   "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
>>
>>   "ssl.keystore.location" -> Constants.KEYSTORE_PATH,
>>
>>   "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
>>
>>   "ssl.key.password" -> Constants.KEYSTORE_PASSWORD
>>
>> )
>>
>> val stream = KafkaUtils.createDirectStream[String, Message](
>>
>>   ssc,
>>
>>   PreferConsistent,
>>
>>   Subscribe[String, Message](topicsSet, kafkaParams)
>>
>> )
>>
>> ---
>> Thanks in Advance,
>> Sethupathi.T
>>
>>
>> On Thu, Sep 5, 2019 at 9:17 AM Gabor Somogyi 
>> wrote:
>>
>>> Hi,
>>>
>>> Let me share Spark 3.0 documentation part (Structured Streaming and not
>>> DStreams what you've mentioned but still relevant):
>>>
>>> kafka.group.id string none streaming and batch The Kafka group id to
>>> use in Kafka consumer while reading from Kafka. Use this with caution. By
>>> default, each query generates a unique group id for reading data. This
>>> ensures that each Kafka source has its own consumer group that does not
>>> face interference from any other consumer, and therefore can read all of
>>> the partitions of its subscribed topics. In some scenarios (for example,
>>> Kafka group-based authorization), you may want to use a specific authorized
>>> group id to read data. You can optionally set the group id. However, do
>>> this with extreme caution as it can cause unexpected behavior. Concurrently
>>> running queries (both, batch and streaming) or sources with the same group
>>> id are likely interfere with each other causing each query to read only
>>> part of the data. This 

Re: [Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

2019-09-05 Thread Sethupathi T
Gabor,

Thanks for the quick response and sharing about spark 3.0,  we need to use
spark streaming (KafkaUtils.createDirectStream) than structured streaming
by following this document
https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html and
re-iterating the issue again for better understanding.
spark-streaming-kafka-0-10
<https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
kafka
connector prefix "spark-executor" + group.id for executors, driver uses
original group id.

*Here is the code where executor construct executor specific group id *

https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
 line # 212,


*It would be great if you could provide the explanation to the following
questions.*

#1 What was the specific reason for prefixing group id in executor ?

#2 Will it be harmful if i build the custom spark-streaming-kafka-0-10
<https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
  library by removing the group id prefix? at line # 212 in KafkaUtils.scala
#3 KafkaUtils.scala is marked as @Experimental what does it mean? is it
advisable to use in production?

*Here is the my spark streaming code snippet*

val kafkaParams = Map[String, Object](

  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,

  ConsumerConfig.GROUP_ID_CONFIG -> subscribers,

  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",

  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),

  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],

  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
classOf[MessageDeserializer],

  "security.protocol" -> "SSL",

  "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,

  "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,

  "ssl.keystore.location" -> Constants.KEYSTORE_PATH,

  "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,

  "ssl.key.password" -> Constants.KEYSTORE_PASSWORD

)

val stream = KafkaUtils.createDirectStream[String, Message](

  ssc,

  PreferConsistent,

  Subscribe[String, Message](topicsSet, kafkaParams)

)

---
Thanks in Advance,
Sethupathi.T


On Thu, Sep 5, 2019 at 9:17 AM Gabor Somogyi 
wrote:

> Hi,
>
> Let me share Spark 3.0 documentation part (Structured Streaming and not
> DStreams what you've mentioned but still relevant):
>
> kafka.group.id string none streaming and batch The Kafka group id to use
> in Kafka consumer while reading from Kafka. Use this with caution. By
> default, each query generates a unique group id for reading data. This
> ensures that each Kafka source has its own consumer group that does not
> face interference from any other consumer, and therefore can read all of
> the partitions of its subscribed topics. In some scenarios (for example,
> Kafka group-based authorization), you may want to use a specific authorized
> group id to read data. You can optionally set the group id. However, do
> this with extreme caution as it can cause unexpected behavior. Concurrently
> running queries (both, batch and streaming) or sources with the same group
> id are likely interfere with each other causing each query to read only
> part of the data. This may also occur when queries are started/restarted in
> quick succession. To minimize such issues, set the Kafka consumer session
> timeout (by setting option "kafka.session.timeout.ms") to be very small.
> When this is set, option "groupIdPrefix" will be ignored.
> I think it answers your questions.
>
> As a general suggestion maybe it worth to revisit Spark 3.0 because
> Structured Streaming has another interesting feature:
> groupIdPrefix string spark-kafka-source streaming and batch Prefix of
> consumer group identifiers (`group.id`) that are generated by structured
> streaming queries. If "kafka.group.id" is set, this option will be
> ignored.
>
> BR,
> G
>
>
> On Thu, Sep 5, 2019 at 10:05 AM Sethupathi T
>  wrote:
>
>> Hi Team,
>>
>> We have secured Kafka cluster (which only allows to consume from the
>> pre-configured, authorized consumer group), there is a scenario where we
>> want to use spark streaming to consume from secured kafka. so we have
>> decided to use spark-streaming-kafka-0-10
>> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html> 
>> (it
>> supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i
>> deploy the application in cluster mode, i realized that the actual group id
>> has been prefixed with "spark-executor" in executor configuration (executor
>> as trying 

Re: [Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

2019-09-05 Thread Sethupathi T
Gabor,

Thanks for the quick response and sharing about spark 3.0,  we need to use
spark streaming (KafkaUtils.createDirectStream) than structured streaming
by following this document
https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html and
re-iterating the issue again for better understanding.
spark-streaming-kafka-0-10
<https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
kafka
connector prefix "spark-executor" + group.id for executors, driver uses
original group id.

*Here is the code where executor construct executor specific group id *

https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
line
# 212,

*It would be great if you could provide the explanation to the following
questions.*

#1 What was the specific reason for prefixing group id in executor ?

#2 Will it be harmful if i build the custom spark-streaming-kafka-0-10
<https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
library by removing the group id prefix? at line # 212 in KafkaUtils.scala

#3 KafkaUtils.scala is marked as @Experimental what does it mean? is it
advisable to use in production?

*Here is the my spark streaming code snippet*

val kafkaParams = Map[String, Object](

  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
  ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
classOf[MessageDeserializer],
  "security.protocol" -> "SSL",
  "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
  "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
  "ssl.keystore.location" -> Constants.KEYSTORE_PATH,
  "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
  "ssl.key.password" -> Constants.KEYSTORE_PASSWORD
)

val stream = KafkaUtils.createDirectStream[String, Message](
  ssc,
  PreferConsistent,
  Subscribe[String, Message](topicsSet, kafkaParams)
)

---
Thanks in Advance,
Sethupathi.T

On Thu, Sep 5, 2019 at 9:17 AM Gabor Somogyi 
wrote:

> Hi,
>
> Let me share Spark 3.0 documentation part (Structured Streaming and not
> DStreams what you've mentioned but still relevant):
>
> kafka.group.id string none streaming and batch The Kafka group id to use
> in Kafka consumer while reading from Kafka. Use this with caution. By
> default, each query generates a unique group id for reading data. This
> ensures that each Kafka source has its own consumer group that does not
> face interference from any other consumer, and therefore can read all of
> the partitions of its subscribed topics. In some scenarios (for example,
> Kafka group-based authorization), you may want to use a specific authorized
> group id to read data. You can optionally set the group id. However, do
> this with extreme caution as it can cause unexpected behavior. Concurrently
> running queries (both, batch and streaming) or sources with the same group
> id are likely interfere with each other causing each query to read only
> part of the data. This may also occur when queries are started/restarted in
> quick succession. To minimize such issues, set the Kafka consumer session
> timeout (by setting option "kafka.session.timeout.ms") to be very small.
> When this is set, option "groupIdPrefix" will be ignored.
> I think it answers your questions.
>
> As a general suggestion maybe it worth to revisit Spark 3.0 because
> Structured Streaming has another interesting feature:
> groupIdPrefix string spark-kafka-source streaming and batch Prefix of
> consumer group identifiers (`group.id`) that are generated by structured
> streaming queries. If "kafka.group.id" is set, this option will be
> ignored.
>
> BR,
> G
>
>
> On Thu, Sep 5, 2019 at 10:05 AM Sethupathi T
>  wrote:
>
>> Hi Team,
>>
>> We have secured Kafka cluster (which only allows to consume from the
>> pre-configured, authorized consumer group), there is a scenario where we
>> want to use spark streaming to consume from secured kafka. so we have
>> decided to use spark-streaming-kafka-0-10
>> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html> 
>> (it
>> supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i
>> deploy the application in cluster mode, i realized that the actual group id
>> has been prefixed with "spark-executor" in executor configuration (executor
>> as trying to connect to kaf

[Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

2019-09-05 Thread Sethupathi T
Hi Team,

We have secured Kafka cluster (which only allows to consume from the
pre-configured, authorized consumer group), there is a scenario where we
want to use spark streaming to consume from secured kafka. so we have
decided to use spark-streaming-kafka-0-10

(it
supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i
deploy the application in cluster mode, i realized that the actual group id
has been prefixed with "spark-executor" in executor configuration (executor
as trying to connect to kafka with "spark-executor" + actual group id,
which is not really exists and getting exception).

*Here is the code where executor construct executor specific group id *

https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
line
# 212,

*Here are my Questions*

#1 What was the specific reason for prefixing group id in executor ?

#2 Will it be harmful if i build the custom spark-streaming-kafka-0-10

library by removing the group id prefix?

#3 KafkaUtils.scala is marked as @Experimental what does it mean? is it
advisable to use in production?

*Here is the my spark streaming code snippet*

val kafkaParams = Map[String, Object](

  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
  ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
classOf[MessageDeserializer],
  "security.protocol" -> "SSL",
  "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
  "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
  "ssl.keystore.location" -> Constants.KEYSTORE_PATH,
  "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
  "ssl.key.password" -> Constants.KEYSTORE_PASSWORD
)

val stream = KafkaUtils.createDirectStream[String, Message](
  ssc,
  PreferConsistent,
  Subscribe[String, Message](topicsSet, kafkaParams)
)

---
Thanks in Advance,
Sethupathi.T