Gabor, Thanks for the clarification.
Thanks On Fri, Sep 6, 2019 at 12:38 AM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > Sethupathi, > > Let me extract then the important part what I've shared: > > 1. "This ensures that each Kafka source has its own consumer group that > does not face interference from any other consumer" > 2. Consumers may eat the data from each other, offset calculation may give > back wrong result (that's the reason why "extreme caution" is recommended > in Structured Streaming doc which still applies here) > 3. yes > > BR, > G > > > On Thu, Sep 5, 2019 at 8:34 PM Sethupathi T <sethu....@googlemail.com> > wrote: > >> Gabor, >> >> Thanks for the quick response and sharing about spark 3.0, we need to >> use spark streaming (KafkaUtils.createDirectStream) than structured >> streaming by following this document >> https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html and >> re-iterating the issue again for better understanding. >> spark-streaming-kafka-0-10 >> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html> >> kafka >> connector prefix "spark-executor" + group.id for executors, driver uses >> original group id. >> >> *Here is the code where executor construct executor specific group id * >> >> >> https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala >> line # 212, >> >> >> *It would be great if you could provide the explanation to the following >> questions.* >> >> #1 What was the specific reason for prefixing group id in executor ? >> >> #2 Will it be harmful if i build the custom spark-streaming-kafka-0-10 >> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html> >> library by removing the group id prefix? at line # 212 in >> KafkaUtils.scala >> #3 KafkaUtils.scala is marked as @Experimental what does it mean? is it >> advisable to use in production? >> >> *Here is the my spark streaming code snippet* >> >> val kafkaParams = Map[String, Object]( >> >> ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS, >> >> ConsumerConfig.GROUP_ID_CONFIG -> subscribers, >> >> ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest", >> >> ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean), >> >> ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> >> classOf[StringDeserializer], >> >> ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> >> classOf[MessageDeserializer], >> >> "security.protocol" -> "SSL", >> >> "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH, >> >> "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD, >> >> "ssl.keystore.location" -> Constants.KEYSTORE_PATH, >> >> "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD, >> >> "ssl.key.password" -> Constants.KEYSTORE_PASSWORD >> >> ) >> >> val stream = KafkaUtils.createDirectStream[String, Message]( >> >> ssc, >> >> PreferConsistent, >> >> Subscribe[String, Message](topicsSet, kafkaParams) >> >> ) >> >> --- >> Thanks in Advance, >> Sethupathi.T >> >> >> On Thu, Sep 5, 2019 at 9:17 AM Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >>> Hi, >>> >>> Let me share Spark 3.0 documentation part (Structured Streaming and not >>> DStreams what you've mentioned but still relevant): >>> >>> kafka.group.id string none streaming and batch The Kafka group id to >>> use in Kafka consumer while reading from Kafka. Use this with caution. By >>> default, each query generates a unique group id for reading data. This >>> ensures that each Kafka source has its own consumer group that does not >>> face interference from any other consumer, and therefore can read all of >>> the partitions of its subscribed topics. In some scenarios (for example, >>> Kafka group-based authorization), you may want to use a specific authorized >>> group id to read data. You can optionally set the group id. However, do >>> this with extreme caution as it can cause unexpected behavior. Concurrently >>> running queries (both, batch and streaming) or sources with the same group >>> id are likely interfere with each other causing each query to read only >>> part of the data. This may also occur when queries are started/restarted in >>> quick succession. To minimize such issues, set the Kafka consumer session >>> timeout (by setting option "kafka.session.timeout.ms") to be very >>> small. When this is set, option "groupIdPrefix" will be ignored. >>> I think it answers your questions. >>> >>> As a general suggestion maybe it worth to revisit Spark 3.0 because >>> Structured Streaming has another interesting feature: >>> groupIdPrefix string spark-kafka-source streaming and batch Prefix of >>> consumer group identifiers (`group.id`) that are generated by >>> structured streaming queries. If "kafka.group.id" is set, this option >>> will be ignored. >>> >>> BR, >>> G >>> >>> >>> On Thu, Sep 5, 2019 at 10:05 AM Sethupathi T >>> <sethu....@googlemail.com.invalid> wrote: >>> >>>> Hi Team, >>>> >>>> We have secured Kafka cluster (which only allows to consume from the >>>> pre-configured, authorized consumer group), there is a scenario where we >>>> want to use spark streaming to consume from secured kafka. so we have >>>> decided to use spark-streaming-kafka-0-10 >>>> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html> >>>> (it >>>> supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i >>>> deploy the application in cluster mode, i realized that the actual group id >>>> has been prefixed with "spark-executor" in executor configuration (executor >>>> as trying to connect to kafka with "spark-executor" + actual group id, >>>> which is not really exists and getting exception). >>>> >>>> *Here is the code where executor construct executor specific group id * >>>> >>>> >>>> https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala >>>> line >>>> # 212, >>>> >>>> *Here are my Questions* >>>> >>>> #1 What was the specific reason for prefixing group id in executor ? >>>> >>>> #2 Will it be harmful if i build the custom spark-streaming-kafka-0-10 >>>> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html> >>>> library by removing the group id prefix? >>>> >>>> #3 KafkaUtils.scala is marked as @Experimental what does it mean? is >>>> it advisable to use in production? >>>> >>>> *Here is the my spark streaming code snippet* >>>> >>>> val kafkaParams = Map[String, Object]( >>>> >>>> ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS, >>>> ConsumerConfig.GROUP_ID_CONFIG -> subscribers, >>>> ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest", >>>> ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean), >>>> ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> >>>> classOf[StringDeserializer], >>>> ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> >>>> classOf[MessageDeserializer], >>>> "security.protocol" -> "SSL", >>>> "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH, >>>> "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD, >>>> "ssl.keystore.location" -> Constants.KEYSTORE_PATH, >>>> "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD, >>>> "ssl.key.password" -> Constants.KEYSTORE_PASSWORD >>>> ) >>>> >>>> val stream = KafkaUtils.createDirectStream[String, Message]( >>>> ssc, >>>> PreferConsistent, >>>> Subscribe[String, Message](topicsSet, kafkaParams) >>>> ) >>>> >>>> --- >>>> Thanks in Advance, >>>> Sethupathi.T >>>> >>>