[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available URL: https://github.com/apache/spark/pull/25760#discussion_r330883064 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ## @@ -46,6 +47,13 @@ private[kafka010] class InternalKafkaConsumer( val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] + private[kafka010] val clusterConfig = KafkaTokenUtil.findMatchingTokenClusterConfig( +SparkEnv.get.conf, kafkaParams.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + .asInstanceOf[String]) + + // Kafka consumer is not able to give back the params instantiated with so we need to store it. + // It must be updated all the time when new consumer created. Review comment: Fixed. BTW, I've also tried to re-open that conversation just to test it and even if I clicked the button not happened anything. Interesting what happens on github nowadays... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available URL: https://github.com/apache/spark/pull/25760#discussion_r330360852 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ## @@ -516,13 +524,25 @@ private[kafka010] class KafkaDataConsumer( fetchedData.withNewPoll(records.listIterator, offsetAfterPoll) } - private def getOrRetrieveConsumer(): InternalKafkaConsumer = _consumer match { -case None => - _consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams)) - require(_consumer.isDefined, "borrowing consumer from pool must always succeed.") - _consumer.get + private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = { +if (!_consumer.isDefined) { + retrieveConsumer() +} +require(_consumer.isDefined, "Consumer must be defined") +if (!KafkaTokenUtil.isConnectorUsingCurrentToken(_consumer.get.kafkaParamsWithSecurity, + _consumer.get.clusterConfig)) { Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available URL: https://github.com/apache/spark/pull/25760#discussion_r330360862 ## File path: external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala ## @@ -288,4 +289,18 @@ private[spark] object KafkaTokenUtil extends Logging { params } + + def isConnectorUsingCurrentToken( + params: ju.Map[String, Object], + clusterConfig: Option[KafkaTokenClusterConf]): Boolean = { +if (params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) { + logDebug("Delegation token used by connector, checking if uses the latest token.") + val consumerJaasParams = params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String] + require(clusterConfig.isDefined, "Delegation token must exist for this connector.") + val currentJaasParams = getTokenJaasParams(clusterConfig.get) + consumerJaasParams.equals(currentJaasParams) Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available URL: https://github.com/apache/spark/pull/25760#discussion_r329771200 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ## @@ -46,6 +47,10 @@ private[kafka010] class InternalKafkaConsumer( val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] + // These fields must be updated whenever a new consumer is created. + private[kafka010] var clusterConfig: Option[KafkaTokenClusterConf] = _ Review comment: You understand it well, it contains only config stuff. What I mean here if a delegation token for a specific cluster is not arriving because tokenprovider throws an exception then the actual code won't find any config object. The end result is an empty jaas config on the consumer which will provide a significant crash (I like when things blow up fast and with noise). On the suggested cached version the code would throw `NPE` [here](https://github.com/apache/spark/blob/e1ea806b3075d279b5f08a29fe4c1ad6d3c4191a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala#L275). From debug point of view it would be best to communicate through exceptions (but not `NPE` of course) so I think it makes sense to make the change and add a `require` constraint to `getTokenJaasParams`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available URL: https://github.com/apache/spark/pull/25760#discussion_r329471858 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ## @@ -46,6 +47,10 @@ private[kafka010] class InternalKafkaConsumer( val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] + // These fields must be updated whenever a new consumer is created. + private[kafka010] var clusterConfig: Option[KafkaTokenClusterConf] = _ Review comment: This value is influenced by the actual delegation tokens in `UGI` as well so I would keep it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available URL: https://github.com/apache/spark/pull/25760#discussion_r329471403 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ## @@ -106,10 +111,12 @@ private[kafka010] class InternalKafkaConsumer( /** Create a KafkaConsumer to fetch records for `topicPartition` */ private def createConsumer(): KafkaConsumer[Array[Byte], Array[Byte]] = { -val updatedKafkaParams = KafkaConfigUpdater("executor", kafkaParams.asScala.toMap) +clusterConfig = KafkaTokenUtil.findMatchingTokenClusterConfig(SparkEnv.get.conf, + kafkaParams.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG).asInstanceOf[String]) +kafkaParamsWithSecurity = KafkaConfigUpdater("executor", kafkaParams.asScala.toMap) Review comment: Nice catch, fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available URL: https://github.com/apache/spark/pull/25760#discussion_r329029986 ## File path: external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala ## @@ -288,4 +288,18 @@ private[spark] object KafkaTokenUtil extends Logging { params } + + def isConnectorUsingCurrentToken(params: ju.Map[String, Object]): Boolean = { +if (params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) { + logDebug("Delegation token used by connector, checking if uses the latest token.") + val consumerJaasParams = params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String] + val clusterConfig = findMatchingTokenClusterConfig(SparkEnv.get.conf, Review comment: Tried out couple of things and ended up in similar solution what you've suggested. I've considered the actual code and couple of further improvements and `clusterConfig` is the key what all the places needs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available URL: https://github.com/apache/spark/pull/25760#discussion_r328710935 ## File path: external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala ## @@ -288,4 +288,18 @@ private[spark] object KafkaTokenUtil extends Logging { params } + + def isConnectorUsingCurrentToken(params: ju.Map[String, Object]): Boolean = { +if (params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) { + logDebug("Delegation token used by connector, checking if uses the latest token.") + val consumerJaasParams = params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String] + val clusterConfig = findMatchingTokenClusterConfig(SparkEnv.get.conf, Review comment: At the moment just playing with it to see what are the options which would make it better (that's the reason why not resolved the conversation)... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available URL: https://github.com/apache/spark/pull/25760#discussion_r328658668 ## File path: external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala ## @@ -288,4 +288,18 @@ private[spark] object KafkaTokenUtil extends Logging { params } + + def isConnectorUsingCurrentToken(params: ju.Map[String, Object]): Boolean = { +if (params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) { + logDebug("Delegation token used by cached connector, checking if uses the latest token.") + val consumerJaasParams = params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String] + val clusterConfig = KafkaTokenUtil.findMatchingTokenClusterConfig(SparkEnv.get.conf, Review comment: Removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available URL: https://github.com/apache/spark/pull/25760#discussion_r328658126 ## File path: external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala ## @@ -288,4 +288,18 @@ private[spark] object KafkaTokenUtil extends Logging { params } + + def isConnectorUsingCurrentToken(params: ju.Map[String, Object]): Boolean = { +if (params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) { + logDebug("Delegation token used by cached connector, checking if uses the latest token.") Review comment: Yep, removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available URL: https://github.com/apache/spark/pull/25760#discussion_r328656490 ## File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala ## @@ -96,47 +103,86 @@ class KafkaDataConsumerSuite extends SharedSparkSession with PrivateMethodTester val context1 = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null) TaskContext.setTaskContext(context1) - val consumer1 = KafkaDataConsumer.acquire(topicPartition, kafkaParams) - - // any method call which requires consumer is necessary - consumer1.getAvailableOffsetRange() - - val consumer1Underlying = consumer1._consumer - assert(consumer1Underlying.isDefined) - - consumer1.release() - - assert(consumerPool.size(key) === 1) - // check whether acquired object is available in pool - val pooledObj = consumerPool.borrowObject(key, kafkaParams) - assert(consumer1Underlying.get.eq(pooledObj)) - consumerPool.returnObject(pooledObj) + val consumer1Underlying = initSingleConsumer(kafkaParams, key) val context2 = new TaskContextImpl(0, 0, 0, 0, 1, null, null, null) TaskContext.setTaskContext(context2) - val consumer2 = KafkaDataConsumer.acquire(topicPartition, kafkaParams) - - // any method call which requires consumer is necessary - consumer2.getAvailableOffsetRange() + val consumer2Underlying = initSingleConsumer(kafkaParams, key) - val consumer2Underlying = consumer2._consumer - assert(consumer2Underlying.isDefined) // here we expect different consumer as pool will invalidate for task reattempt assert(consumer2Underlying.get.ne(consumer1Underlying.get)) +} finally { + TaskContext.unset() +} + } - consumer2.release() + test("same KafkaDataConsumer instance in case of same token") { +try { + val kafkaParams = getKafkaParams() + val key = new CacheKey(groupId, topicPartition) - // The first consumer should be removed from cache, but the consumer after invalidate - // should be cached. - assert(consumerPool.size(key) === 1) - val pooledObj2 = consumerPool.borrowObject(key, kafkaParams) - assert(consumer2Underlying.get.eq(pooledObj2)) - consumerPool.returnObject(pooledObj2) + val context = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null) + TaskContext.setTaskContext(context) + setSparkEnv( +Map( + s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers" -> bootStrapServers +) + ) + addTokenToUGI(tokenService1, tokenId1, tokenPassword1) + val consumer1Underlying = initSingleConsumer(kafkaParams, key) + val consumer2Underlying = initSingleConsumer(kafkaParams, key) + + assert(consumer2Underlying.get.eq(consumer1Underlying.get)) } finally { TaskContext.unset() } } + test("new KafkaDataConsumer instance in case of token renewal") { +try { + val kafkaParams = getKafkaParams() + val key = new CacheKey(groupId, topicPartition) + + val context = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null) + TaskContext.setTaskContext(context) + setSparkEnv( +Map( + s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers" -> bootStrapServers +) + ) + addTokenToUGI(tokenService1, tokenId1, tokenPassword1) + val consumer1Underlying = initSingleConsumer(kafkaParams, key) + addTokenToUGI(tokenService1, tokenId2, tokenPassword2) + val consumer2Underlying = initSingleConsumer(kafkaParams, key) + + assert(consumer2Underlying.get.ne(consumer1Underlying.get)) +} finally { + TaskContext.unset() +} + } + + private def initSingleConsumer( + kafkaParams: ju.Map[String, Object], + key: CacheKey): Option[InternalKafkaConsumer] = { +val consumer = KafkaDataConsumer.acquire(topicPartition, kafkaParams) + +// any method call which requires consumer is necessary +consumer.getOrRetrieveConsumer() + +val consumerUnderlying = consumer._consumer +assert(consumerUnderlying.isDefined) Review comment: Good simplification, removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available URL: https://github.com/apache/spark/pull/25760#discussion_r326592824 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ## @@ -516,13 +521,39 @@ private[kafka010] class KafkaDataConsumer( fetchedData.withNewPoll(records.listIterator, offsetAfterPoll) } - private def getOrRetrieveConsumer(): InternalKafkaConsumer = _consumer match { -case None => - _consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams)) - require(_consumer.isDefined, "borrowing consumer from pool must always succeed.") - _consumer.get + private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = { +if (!_consumer.isDefined) { + retrieveConsumer() +} +if (!isConsumerUsingCurrentToken) { + logDebug("Cached consumer uses and old delegation token, invalidating.") + releaseConsumer() + consumerPool.invalidateKey(cacheKey) + fetchedDataPool.invalidate(cacheKey) + retrieveConsumer() +} +_consumer.get + } + + private def retrieveConsumer(): Unit = { +_consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams)) +require(_consumer.isDefined, "borrowing consumer from pool must always succeed.") + } -case Some(consumer) => consumer + private def isConsumerUsingCurrentToken: Boolean = { Review comment: Moved. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available URL: https://github.com/apache/spark/pull/25760#discussion_r326550724 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ## @@ -516,13 +521,39 @@ private[kafka010] class KafkaDataConsumer( fetchedData.withNewPoll(records.listIterator, offsetAfterPoll) } - private def getOrRetrieveConsumer(): InternalKafkaConsumer = _consumer match { -case None => - _consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams)) - require(_consumer.isDefined, "borrowing consumer from pool must always succeed.") - _consumer.get + private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = { +if (!_consumer.isDefined) { + retrieveConsumer() +} +if (!isConsumerUsingCurrentToken) { + logDebug("Cached consumer uses and old delegation token, invalidating.") + releaseConsumer() + consumerPool.invalidateKey(cacheKey) + fetchedDataPool.invalidate(cacheKey) + retrieveConsumer() +} +_consumer.get + } + + private def retrieveConsumer(): Unit = { +_consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams)) +require(_consumer.isDefined, "borrowing consumer from pool must always succeed.") + } -case Some(consumer) => consumer + private def isConsumerUsingCurrentToken: Boolean = { Review comment: I think it's a good idea, let me check... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available URL: https://github.com/apache/spark/pull/25760#discussion_r326319627 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ## @@ -516,13 +521,41 @@ private[kafka010] class KafkaDataConsumer( fetchedData.withNewPoll(records.listIterator, offsetAfterPoll) } - private def getOrRetrieveConsumer(): InternalKafkaConsumer = _consumer match { -case None => - _consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams)) - require(_consumer.isDefined, "borrowing consumer from pool must always succeed.") - _consumer.get + private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = { +if (!_consumer.isDefined) { + retrieveConsumer() +} +ensureConsumerHasLatestToken() Review comment: I've used this approach and made the code much more simple and structured. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available URL: https://github.com/apache/spark/pull/25760#discussion_r326319392 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ## @@ -516,13 +521,41 @@ private[kafka010] class KafkaDataConsumer( fetchedData.withNewPoll(records.listIterator, offsetAfterPoll) } - private def getOrRetrieveConsumer(): InternalKafkaConsumer = _consumer match { -case None => - _consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams)) - require(_consumer.isDefined, "borrowing consumer from pool must always succeed.") - _consumer.get + private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = { +if (!_consumer.isDefined) { + retrieveConsumer() +} +ensureConsumerHasLatestToken() +_consumer.get + } -case Some(consumer) => consumer + private def retrieveConsumer(): Unit = { +_consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams)) +require(_consumer.isDefined, "borrowing consumer from pool must always succeed.") + } + + private def ensureConsumerHasLatestToken(): Unit = { +require(_consumer.isDefined, "Consumer must be defined") +val params = _consumer.get.kafkaParamsWithSecurity +if (params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) { + logDebug("Delegation token used by cached consumer, checking if uses the latest token.") + + val jaasParams = params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String] + val clusterConfig = KafkaTokenUtil.findMatchingToken(SparkEnv.get.conf, Review comment: Restructured this code part. Please have a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available URL: https://github.com/apache/spark/pull/25760#discussion_r326269926 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ## @@ -516,13 +521,41 @@ private[kafka010] class KafkaDataConsumer( fetchedData.withNewPoll(records.listIterator, offsetAfterPoll) } - private def getOrRetrieveConsumer(): InternalKafkaConsumer = _consumer match { -case None => - _consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams)) - require(_consumer.isDefined, "borrowing consumer from pool must always succeed.") - _consumer.get + private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = { +if (!_consumer.isDefined) { + retrieveConsumer() +} +ensureConsumerHasLatestToken() +_consumer.get + } -case Some(consumer) => consumer + private def retrieveConsumer(): Unit = { +_consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams)) +require(_consumer.isDefined, "borrowing consumer from pool must always succeed.") + } + + private def ensureConsumerHasLatestToken(): Unit = { +require(_consumer.isDefined, "Consumer must be defined") +val params = _consumer.get.kafkaParamsWithSecurity +if (params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) { + logDebug("Delegation token used by cached consumer, checking if uses the latest token.") + + val jaasParams = params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String] + val clusterConfig = KafkaTokenUtil.findMatchingToken(SparkEnv.get.conf, Review comment: It took a while to recall what was the original intention with `KafkaTokenUtil.findMatchingToken` so something can be definitely improved. So all in all `KafkaTokenUtil.findMatchingToken` finds a token in `UGI` and gives back information which belongs to this token, now config + token. Let me check how can we make it better... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available URL: https://github.com/apache/spark/pull/25760#discussion_r326277430 ## File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala ## @@ -96,47 +103,86 @@ class KafkaDataConsumerSuite extends SharedSparkSession with PrivateMethodTester val context1 = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null) TaskContext.setTaskContext(context1) - val consumer1 = KafkaDataConsumer.acquire(topicPartition, kafkaParams) - - // any method call which requires consumer is necessary - consumer1.getAvailableOffsetRange() - - val consumer1Underlying = consumer1._consumer - assert(consumer1Underlying.isDefined) - - consumer1.release() - - assert(consumerPool.size(key) === 1) - // check whether acquired object is available in pool - val pooledObj = consumerPool.borrowObject(key, kafkaParams) - assert(consumer1Underlying.get.eq(pooledObj)) - consumerPool.returnObject(pooledObj) + val consumer1Underlying = initSingleConsumer(kafkaParams, key) val context2 = new TaskContextImpl(0, 0, 0, 0, 1, null, null, null) TaskContext.setTaskContext(context2) - val consumer2 = KafkaDataConsumer.acquire(topicPartition, kafkaParams) - - // any method call which requires consumer is necessary - consumer2.getAvailableOffsetRange() + val consumer2Underlying = initSingleConsumer(kafkaParams, key) - val consumer2Underlying = consumer2._consumer - assert(consumer2Underlying.isDefined) // here we expect different consumer as pool will invalidate for task reattempt assert(consumer2Underlying.get.ne(consumer1Underlying.get)) +} finally { + TaskContext.unset() +} + } - consumer2.release() + test("same KafkaDataConsumer instance in case of same token") { +try { + val kafkaParams = getKafkaParams() + val key = new CacheKey(groupId, topicPartition) - // The first consumer should be removed from cache, but the consumer after invalidate Review comment: The comment has been deleted but the functionality has been moved into `initSingleConsumer` (I had the feeling the original comment doesn't fit there). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available URL: https://github.com/apache/spark/pull/25760#discussion_r326277430 ## File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala ## @@ -96,47 +103,86 @@ class KafkaDataConsumerSuite extends SharedSparkSession with PrivateMethodTester val context1 = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null) TaskContext.setTaskContext(context1) - val consumer1 = KafkaDataConsumer.acquire(topicPartition, kafkaParams) - - // any method call which requires consumer is necessary - consumer1.getAvailableOffsetRange() - - val consumer1Underlying = consumer1._consumer - assert(consumer1Underlying.isDefined) - - consumer1.release() - - assert(consumerPool.size(key) === 1) - // check whether acquired object is available in pool - val pooledObj = consumerPool.borrowObject(key, kafkaParams) - assert(consumer1Underlying.get.eq(pooledObj)) - consumerPool.returnObject(pooledObj) + val consumer1Underlying = initSingleConsumer(kafkaParams, key) val context2 = new TaskContextImpl(0, 0, 0, 0, 1, null, null, null) TaskContext.setTaskContext(context2) - val consumer2 = KafkaDataConsumer.acquire(topicPartition, kafkaParams) - - // any method call which requires consumer is necessary - consumer2.getAvailableOffsetRange() + val consumer2Underlying = initSingleConsumer(kafkaParams, key) - val consumer2Underlying = consumer2._consumer - assert(consumer2Underlying.isDefined) // here we expect different consumer as pool will invalidate for task reattempt assert(consumer2Underlying.get.ne(consumer1Underlying.get)) +} finally { + TaskContext.unset() +} + } - consumer2.release() + test("same KafkaDataConsumer instance in case of same token") { +try { + val kafkaParams = getKafkaParams() + val key = new CacheKey(groupId, topicPartition) - // The first consumer should be removed from cache, but the consumer after invalidate Review comment: The comment has been deleted but the functionality has been moved into `initSingleConsumer`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available URL: https://github.com/apache/spark/pull/25760#discussion_r326269926 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ## @@ -516,13 +521,41 @@ private[kafka010] class KafkaDataConsumer( fetchedData.withNewPoll(records.listIterator, offsetAfterPoll) } - private def getOrRetrieveConsumer(): InternalKafkaConsumer = _consumer match { -case None => - _consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams)) - require(_consumer.isDefined, "borrowing consumer from pool must always succeed.") - _consumer.get + private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = { +if (!_consumer.isDefined) { + retrieveConsumer() +} +ensureConsumerHasLatestToken() +_consumer.get + } -case Some(consumer) => consumer + private def retrieveConsumer(): Unit = { +_consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams)) +require(_consumer.isDefined, "borrowing consumer from pool must always succeed.") + } + + private def ensureConsumerHasLatestToken(): Unit = { +require(_consumer.isDefined, "Consumer must be defined") +val params = _consumer.get.kafkaParamsWithSecurity +if (params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) { + logDebug("Delegation token used by cached consumer, checking if uses the latest token.") + + val jaasParams = params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String] + val clusterConfig = KafkaTokenUtil.findMatchingToken(SparkEnv.get.conf, Review comment: It took a while to recall what was the original intention with `KafkaTokenUtil.findMatchingToken` so something can be definitely better. So all in all `KafkaTokenUtil.findMatchingToken` finds a token in `UGI` and gives back information which belongs to this token, now config + token. Let me check how can we make it better... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available URL: https://github.com/apache/spark/pull/25760#discussion_r326268320 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ## @@ -516,13 +521,41 @@ private[kafka010] class KafkaDataConsumer( fetchedData.withNewPoll(records.listIterator, offsetAfterPoll) } - private def getOrRetrieveConsumer(): InternalKafkaConsumer = _consumer match { -case None => - _consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams)) - require(_consumer.isDefined, "borrowing consumer from pool must always succeed.") - _consumer.get + private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = { +if (!_consumer.isDefined) { + retrieveConsumer() +} +ensureConsumerHasLatestToken() Review comment: Will consider this with during refactor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available URL: https://github.com/apache/spark/pull/25760#discussion_r326267775 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ## @@ -46,6 +48,9 @@ private[kafka010] class InternalKafkaConsumer( val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] + // Kafka consumer is not able to give back the params instantiated with so we need to store it. + // It must be updated all the time when new consumer created. Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org