[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available

2019-10-02 Thread GitBox
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] 
Invalidate Kafka consumer when new delegation token available
URL: https://github.com/apache/spark/pull/25760#discussion_r330883064
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ##
 @@ -46,6 +47,13 @@ private[kafka010] class InternalKafkaConsumer(
 
   val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
 
+  private[kafka010] val clusterConfig = 
KafkaTokenUtil.findMatchingTokenClusterConfig(
+SparkEnv.get.conf, 
kafkaParams.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+  .asInstanceOf[String])
+
+  // Kafka consumer is not able to give back the params instantiated with so 
we need to store it.
+  // It must be updated all the time when new consumer created.
 
 Review comment:
   Fixed.
   
   BTW, I've also tried to re-open that conversation just to test it and even 
if I clicked the button not happened anything. Interesting what happens on 
github nowadays...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available

2019-10-01 Thread GitBox
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] 
Invalidate Kafka consumer when new delegation token available
URL: https://github.com/apache/spark/pull/25760#discussion_r330360852
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ##
 @@ -516,13 +524,25 @@ private[kafka010] class KafkaDataConsumer(
 fetchedData.withNewPoll(records.listIterator, offsetAfterPoll)
   }
 
-  private def getOrRetrieveConsumer(): InternalKafkaConsumer = _consumer match 
{
-case None =>
-  _consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams))
-  require(_consumer.isDefined, "borrowing consumer from pool must always 
succeed.")
-  _consumer.get
+  private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = {
+if (!_consumer.isDefined) {
+  retrieveConsumer()
+}
+require(_consumer.isDefined, "Consumer must be defined")
+if 
(!KafkaTokenUtil.isConnectorUsingCurrentToken(_consumer.get.kafkaParamsWithSecurity,
+  _consumer.get.clusterConfig)) {
 
 Review comment:
   Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available

2019-10-01 Thread GitBox
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] 
Invalidate Kafka consumer when new delegation token available
URL: https://github.com/apache/spark/pull/25760#discussion_r330360862
 
 

 ##
 File path: 
external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
 ##
 @@ -288,4 +289,18 @@ private[spark] object KafkaTokenUtil extends Logging {
 
 params
   }
+
+  def isConnectorUsingCurrentToken(
+  params: ju.Map[String, Object],
+  clusterConfig: Option[KafkaTokenClusterConf]): Boolean = {
+if (params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) {
+  logDebug("Delegation token used by connector, checking if uses the 
latest token.")
+  val consumerJaasParams = 
params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String]
+  require(clusterConfig.isDefined, "Delegation token must exist for this 
connector.")
+  val currentJaasParams = getTokenJaasParams(clusterConfig.get)
+  consumerJaasParams.equals(currentJaasParams)
 
 Review comment:
   Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available

2019-09-30 Thread GitBox
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] 
Invalidate Kafka consumer when new delegation token available
URL: https://github.com/apache/spark/pull/25760#discussion_r329771200
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ##
 @@ -46,6 +47,10 @@ private[kafka010] class InternalKafkaConsumer(
 
   val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
 
+  // These fields must be updated whenever a new consumer is created.
+  private[kafka010] var clusterConfig: Option[KafkaTokenClusterConf] = _
 
 Review comment:
   You understand it well, it contains only config stuff.
   What I mean here if a delegation token for a specific cluster is not 
arriving because tokenprovider throws an exception then the actual code won't 
find any config object. The end result is an empty jaas config on the consumer 
which will provide a significant crash (I like when things blow up fast and 
with noise).
   On the suggested cached version the code would throw `NPE` 
[here](https://github.com/apache/spark/blob/e1ea806b3075d279b5f08a29fe4c1ad6d3c4191a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala#L275).
 From debug point of view it would be best to communicate through exceptions 
(but not `NPE` of course) so I think it makes sense to make the change and add 
a `require` constraint to `getTokenJaasParams`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available

2019-09-30 Thread GitBox
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] 
Invalidate Kafka consumer when new delegation token available
URL: https://github.com/apache/spark/pull/25760#discussion_r329471858
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ##
 @@ -46,6 +47,10 @@ private[kafka010] class InternalKafkaConsumer(
 
   val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
 
+  // These fields must be updated whenever a new consumer is created.
+  private[kafka010] var clusterConfig: Option[KafkaTokenClusterConf] = _
 
 Review comment:
   This value is influenced by the actual delegation tokens in `UGI` as well so 
I would keep it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available

2019-09-30 Thread GitBox
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] 
Invalidate Kafka consumer when new delegation token available
URL: https://github.com/apache/spark/pull/25760#discussion_r329471403
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ##
 @@ -106,10 +111,12 @@ private[kafka010] class InternalKafkaConsumer(
 
   /** Create a KafkaConsumer to fetch records for `topicPartition` */
   private def createConsumer(): KafkaConsumer[Array[Byte], Array[Byte]] = {
-val updatedKafkaParams = KafkaConfigUpdater("executor", 
kafkaParams.asScala.toMap)
+clusterConfig = 
KafkaTokenUtil.findMatchingTokenClusterConfig(SparkEnv.get.conf,
+  
kafkaParams.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG).asInstanceOf[String])
+kafkaParamsWithSecurity = KafkaConfigUpdater("executor", 
kafkaParams.asScala.toMap)
 
 Review comment:
   Nice catch, fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available

2019-09-27 Thread GitBox
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] 
Invalidate Kafka consumer when new delegation token available
URL: https://github.com/apache/spark/pull/25760#discussion_r329029986
 
 

 ##
 File path: 
external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
 ##
 @@ -288,4 +288,18 @@ private[spark] object KafkaTokenUtil extends Logging {
 
 params
   }
+
+  def isConnectorUsingCurrentToken(params: ju.Map[String, Object]): Boolean = {
+if (params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) {
+  logDebug("Delegation token used by connector, checking if uses the 
latest token.")
+  val consumerJaasParams = 
params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String]
+  val clusterConfig = findMatchingTokenClusterConfig(SparkEnv.get.conf,
 
 Review comment:
   Tried out couple of things and ended up in similar solution what you've 
suggested. I've considered the actual code and couple of further improvements 
and `clusterConfig` is the key what all the places needs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available

2019-09-26 Thread GitBox
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] 
Invalidate Kafka consumer when new delegation token available
URL: https://github.com/apache/spark/pull/25760#discussion_r328710935
 
 

 ##
 File path: 
external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
 ##
 @@ -288,4 +288,18 @@ private[spark] object KafkaTokenUtil extends Logging {
 
 params
   }
+
+  def isConnectorUsingCurrentToken(params: ju.Map[String, Object]): Boolean = {
+if (params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) {
+  logDebug("Delegation token used by connector, checking if uses the 
latest token.")
+  val consumerJaasParams = 
params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String]
+  val clusterConfig = findMatchingTokenClusterConfig(SparkEnv.get.conf,
 
 Review comment:
   At the moment just playing with it to see what are the options which would 
make it better (that's the reason why not resolved the conversation)...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available

2019-09-26 Thread GitBox
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] 
Invalidate Kafka consumer when new delegation token available
URL: https://github.com/apache/spark/pull/25760#discussion_r328658668
 
 

 ##
 File path: 
external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
 ##
 @@ -288,4 +288,18 @@ private[spark] object KafkaTokenUtil extends Logging {
 
 params
   }
+
+  def isConnectorUsingCurrentToken(params: ju.Map[String, Object]): Boolean = {
+if (params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) {
+  logDebug("Delegation token used by cached connector, checking if uses 
the latest token.")
+  val consumerJaasParams = 
params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String]
+  val clusterConfig = 
KafkaTokenUtil.findMatchingTokenClusterConfig(SparkEnv.get.conf,
 
 Review comment:
   Removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available

2019-09-26 Thread GitBox
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] 
Invalidate Kafka consumer when new delegation token available
URL: https://github.com/apache/spark/pull/25760#discussion_r328658126
 
 

 ##
 File path: 
external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
 ##
 @@ -288,4 +288,18 @@ private[spark] object KafkaTokenUtil extends Logging {
 
 params
   }
+
+  def isConnectorUsingCurrentToken(params: ju.Map[String, Object]): Boolean = {
+if (params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) {
+  logDebug("Delegation token used by cached connector, checking if uses 
the latest token.")
 
 Review comment:
   Yep, removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available

2019-09-26 Thread GitBox
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] 
Invalidate Kafka consumer when new delegation token available
URL: https://github.com/apache/spark/pull/25760#discussion_r328656490
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala
 ##
 @@ -96,47 +103,86 @@ class KafkaDataConsumerSuite extends SharedSparkSession 
with PrivateMethodTester
 
   val context1 = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null)
   TaskContext.setTaskContext(context1)
-  val consumer1 = KafkaDataConsumer.acquire(topicPartition, kafkaParams)
-
-  // any method call which requires consumer is necessary
-  consumer1.getAvailableOffsetRange()
-
-  val consumer1Underlying = consumer1._consumer
-  assert(consumer1Underlying.isDefined)
-
-  consumer1.release()
-
-  assert(consumerPool.size(key) === 1)
-  // check whether acquired object is available in pool
-  val pooledObj = consumerPool.borrowObject(key, kafkaParams)
-  assert(consumer1Underlying.get.eq(pooledObj))
-  consumerPool.returnObject(pooledObj)
+  val consumer1Underlying = initSingleConsumer(kafkaParams, key)
 
   val context2 = new TaskContextImpl(0, 0, 0, 0, 1, null, null, null)
   TaskContext.setTaskContext(context2)
-  val consumer2 = KafkaDataConsumer.acquire(topicPartition, kafkaParams)
-
-  // any method call which requires consumer is necessary
-  consumer2.getAvailableOffsetRange()
+  val consumer2Underlying = initSingleConsumer(kafkaParams, key)
 
-  val consumer2Underlying = consumer2._consumer
-  assert(consumer2Underlying.isDefined)
   // here we expect different consumer as pool will invalidate for task 
reattempt
   assert(consumer2Underlying.get.ne(consumer1Underlying.get))
+} finally {
+  TaskContext.unset()
+}
+  }
 
-  consumer2.release()
+  test("same KafkaDataConsumer instance in case of same token") {
+try {
+  val kafkaParams = getKafkaParams()
+  val key = new CacheKey(groupId, topicPartition)
 
-  // The first consumer should be removed from cache, but the consumer 
after invalidate
-  // should be cached.
-  assert(consumerPool.size(key) === 1)
-  val pooledObj2 = consumerPool.borrowObject(key, kafkaParams)
-  assert(consumer2Underlying.get.eq(pooledObj2))
-  consumerPool.returnObject(pooledObj2)
+  val context = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null)
+  TaskContext.setTaskContext(context)
+  setSparkEnv(
+Map(
+  s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers" -> 
bootStrapServers
+)
+  )
+  addTokenToUGI(tokenService1, tokenId1, tokenPassword1)
+  val consumer1Underlying = initSingleConsumer(kafkaParams, key)
+  val consumer2Underlying = initSingleConsumer(kafkaParams, key)
+
+  assert(consumer2Underlying.get.eq(consumer1Underlying.get))
 } finally {
   TaskContext.unset()
 }
   }
 
+  test("new KafkaDataConsumer instance in case of token renewal") {
+try {
+  val kafkaParams = getKafkaParams()
+  val key = new CacheKey(groupId, topicPartition)
+
+  val context = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null)
+  TaskContext.setTaskContext(context)
+  setSparkEnv(
+Map(
+  s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers" -> 
bootStrapServers
+)
+  )
+  addTokenToUGI(tokenService1, tokenId1, tokenPassword1)
+  val consumer1Underlying = initSingleConsumer(kafkaParams, key)
+  addTokenToUGI(tokenService1, tokenId2, tokenPassword2)
+  val consumer2Underlying = initSingleConsumer(kafkaParams, key)
+
+  assert(consumer2Underlying.get.ne(consumer1Underlying.get))
+} finally {
+  TaskContext.unset()
+}
+  }
+
+  private def initSingleConsumer(
+  kafkaParams: ju.Map[String, Object],
+  key: CacheKey): Option[InternalKafkaConsumer] = {
+val consumer = KafkaDataConsumer.acquire(topicPartition, kafkaParams)
+
+// any method call which requires consumer is necessary
+consumer.getOrRetrieveConsumer()
+
+val consumerUnderlying = consumer._consumer
+assert(consumerUnderlying.isDefined)
 
 Review comment:
   Good simplification, removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available

2019-09-20 Thread GitBox
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] 
Invalidate Kafka consumer when new delegation token available
URL: https://github.com/apache/spark/pull/25760#discussion_r326592824
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ##
 @@ -516,13 +521,39 @@ private[kafka010] class KafkaDataConsumer(
 fetchedData.withNewPoll(records.listIterator, offsetAfterPoll)
   }
 
-  private def getOrRetrieveConsumer(): InternalKafkaConsumer = _consumer match 
{
-case None =>
-  _consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams))
-  require(_consumer.isDefined, "borrowing consumer from pool must always 
succeed.")
-  _consumer.get
+  private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = {
+if (!_consumer.isDefined) {
+  retrieveConsumer()
+}
+if (!isConsumerUsingCurrentToken) {
+  logDebug("Cached consumer uses and old delegation token, invalidating.")
+  releaseConsumer()
+  consumerPool.invalidateKey(cacheKey)
+  fetchedDataPool.invalidate(cacheKey)
+  retrieveConsumer()
+}
+_consumer.get
+  }
+
+  private def retrieveConsumer(): Unit = {
+_consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams))
+require(_consumer.isDefined, "borrowing consumer from pool must always 
succeed.")
+  }
 
-case Some(consumer) => consumer
+  private def isConsumerUsingCurrentToken: Boolean = {
 
 Review comment:
   Moved.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available

2019-09-20 Thread GitBox
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] 
Invalidate Kafka consumer when new delegation token available
URL: https://github.com/apache/spark/pull/25760#discussion_r326550724
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ##
 @@ -516,13 +521,39 @@ private[kafka010] class KafkaDataConsumer(
 fetchedData.withNewPoll(records.listIterator, offsetAfterPoll)
   }
 
-  private def getOrRetrieveConsumer(): InternalKafkaConsumer = _consumer match 
{
-case None =>
-  _consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams))
-  require(_consumer.isDefined, "borrowing consumer from pool must always 
succeed.")
-  _consumer.get
+  private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = {
+if (!_consumer.isDefined) {
+  retrieveConsumer()
+}
+if (!isConsumerUsingCurrentToken) {
+  logDebug("Cached consumer uses and old delegation token, invalidating.")
+  releaseConsumer()
+  consumerPool.invalidateKey(cacheKey)
+  fetchedDataPool.invalidate(cacheKey)
+  retrieveConsumer()
+}
+_consumer.get
+  }
+
+  private def retrieveConsumer(): Unit = {
+_consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams))
+require(_consumer.isDefined, "borrowing consumer from pool must always 
succeed.")
+  }
 
-case Some(consumer) => consumer
+  private def isConsumerUsingCurrentToken: Boolean = {
 
 Review comment:
   I think it's a good idea, let me check...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available

2019-09-19 Thread GitBox
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] 
Invalidate Kafka consumer when new delegation token available
URL: https://github.com/apache/spark/pull/25760#discussion_r326319627
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ##
 @@ -516,13 +521,41 @@ private[kafka010] class KafkaDataConsumer(
 fetchedData.withNewPoll(records.listIterator, offsetAfterPoll)
   }
 
-  private def getOrRetrieveConsumer(): InternalKafkaConsumer = _consumer match 
{
-case None =>
-  _consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams))
-  require(_consumer.isDefined, "borrowing consumer from pool must always 
succeed.")
-  _consumer.get
+  private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = {
+if (!_consumer.isDefined) {
+  retrieveConsumer()
+}
+ensureConsumerHasLatestToken()
 
 Review comment:
   I've used this approach and made the code much more simple and structured. 
Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available

2019-09-19 Thread GitBox
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] 
Invalidate Kafka consumer when new delegation token available
URL: https://github.com/apache/spark/pull/25760#discussion_r326319392
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ##
 @@ -516,13 +521,41 @@ private[kafka010] class KafkaDataConsumer(
 fetchedData.withNewPoll(records.listIterator, offsetAfterPoll)
   }
 
-  private def getOrRetrieveConsumer(): InternalKafkaConsumer = _consumer match 
{
-case None =>
-  _consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams))
-  require(_consumer.isDefined, "borrowing consumer from pool must always 
succeed.")
-  _consumer.get
+  private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = {
+if (!_consumer.isDefined) {
+  retrieveConsumer()
+}
+ensureConsumerHasLatestToken()
+_consumer.get
+  }
 
-case Some(consumer) => consumer
+  private def retrieveConsumer(): Unit = {
+_consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams))
+require(_consumer.isDefined, "borrowing consumer from pool must always 
succeed.")
+  }
+
+  private def ensureConsumerHasLatestToken(): Unit = {
+require(_consumer.isDefined, "Consumer must be defined")
+val params = _consumer.get.kafkaParamsWithSecurity
+if (params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) {
+  logDebug("Delegation token used by cached consumer, checking if uses the 
latest token.")
+
+  val jaasParams = 
params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String]
+  val clusterConfig = KafkaTokenUtil.findMatchingToken(SparkEnv.get.conf,
 
 Review comment:
   Restructured this code part. Please have a look.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available

2019-09-19 Thread GitBox
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] 
Invalidate Kafka consumer when new delegation token available
URL: https://github.com/apache/spark/pull/25760#discussion_r326269926
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ##
 @@ -516,13 +521,41 @@ private[kafka010] class KafkaDataConsumer(
 fetchedData.withNewPoll(records.listIterator, offsetAfterPoll)
   }
 
-  private def getOrRetrieveConsumer(): InternalKafkaConsumer = _consumer match 
{
-case None =>
-  _consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams))
-  require(_consumer.isDefined, "borrowing consumer from pool must always 
succeed.")
-  _consumer.get
+  private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = {
+if (!_consumer.isDefined) {
+  retrieveConsumer()
+}
+ensureConsumerHasLatestToken()
+_consumer.get
+  }
 
-case Some(consumer) => consumer
+  private def retrieveConsumer(): Unit = {
+_consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams))
+require(_consumer.isDefined, "borrowing consumer from pool must always 
succeed.")
+  }
+
+  private def ensureConsumerHasLatestToken(): Unit = {
+require(_consumer.isDefined, "Consumer must be defined")
+val params = _consumer.get.kafkaParamsWithSecurity
+if (params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) {
+  logDebug("Delegation token used by cached consumer, checking if uses the 
latest token.")
+
+  val jaasParams = 
params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String]
+  val clusterConfig = KafkaTokenUtil.findMatchingToken(SparkEnv.get.conf,
 
 Review comment:
   It took a while to recall what was the original intention with 
`KafkaTokenUtil.findMatchingToken` so something can be definitely improved. So 
all in all `KafkaTokenUtil.findMatchingToken` finds a token in `UGI` and gives 
back information which belongs to this token, now config + token. Let me check 
how can we make it better...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available

2019-09-19 Thread GitBox
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] 
Invalidate Kafka consumer when new delegation token available
URL: https://github.com/apache/spark/pull/25760#discussion_r326277430
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala
 ##
 @@ -96,47 +103,86 @@ class KafkaDataConsumerSuite extends SharedSparkSession 
with PrivateMethodTester
 
   val context1 = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null)
   TaskContext.setTaskContext(context1)
-  val consumer1 = KafkaDataConsumer.acquire(topicPartition, kafkaParams)
-
-  // any method call which requires consumer is necessary
-  consumer1.getAvailableOffsetRange()
-
-  val consumer1Underlying = consumer1._consumer
-  assert(consumer1Underlying.isDefined)
-
-  consumer1.release()
-
-  assert(consumerPool.size(key) === 1)
-  // check whether acquired object is available in pool
-  val pooledObj = consumerPool.borrowObject(key, kafkaParams)
-  assert(consumer1Underlying.get.eq(pooledObj))
-  consumerPool.returnObject(pooledObj)
+  val consumer1Underlying = initSingleConsumer(kafkaParams, key)
 
   val context2 = new TaskContextImpl(0, 0, 0, 0, 1, null, null, null)
   TaskContext.setTaskContext(context2)
-  val consumer2 = KafkaDataConsumer.acquire(topicPartition, kafkaParams)
-
-  // any method call which requires consumer is necessary
-  consumer2.getAvailableOffsetRange()
+  val consumer2Underlying = initSingleConsumer(kafkaParams, key)
 
-  val consumer2Underlying = consumer2._consumer
-  assert(consumer2Underlying.isDefined)
   // here we expect different consumer as pool will invalidate for task 
reattempt
   assert(consumer2Underlying.get.ne(consumer1Underlying.get))
+} finally {
+  TaskContext.unset()
+}
+  }
 
-  consumer2.release()
+  test("same KafkaDataConsumer instance in case of same token") {
+try {
+  val kafkaParams = getKafkaParams()
+  val key = new CacheKey(groupId, topicPartition)
 
-  // The first consumer should be removed from cache, but the consumer 
after invalidate
 
 Review comment:
   The comment has been deleted but the functionality has been moved into 
`initSingleConsumer` (I had the feeling the original comment doesn't fit there).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available

2019-09-19 Thread GitBox
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] 
Invalidate Kafka consumer when new delegation token available
URL: https://github.com/apache/spark/pull/25760#discussion_r326277430
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala
 ##
 @@ -96,47 +103,86 @@ class KafkaDataConsumerSuite extends SharedSparkSession 
with PrivateMethodTester
 
   val context1 = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null)
   TaskContext.setTaskContext(context1)
-  val consumer1 = KafkaDataConsumer.acquire(topicPartition, kafkaParams)
-
-  // any method call which requires consumer is necessary
-  consumer1.getAvailableOffsetRange()
-
-  val consumer1Underlying = consumer1._consumer
-  assert(consumer1Underlying.isDefined)
-
-  consumer1.release()
-
-  assert(consumerPool.size(key) === 1)
-  // check whether acquired object is available in pool
-  val pooledObj = consumerPool.borrowObject(key, kafkaParams)
-  assert(consumer1Underlying.get.eq(pooledObj))
-  consumerPool.returnObject(pooledObj)
+  val consumer1Underlying = initSingleConsumer(kafkaParams, key)
 
   val context2 = new TaskContextImpl(0, 0, 0, 0, 1, null, null, null)
   TaskContext.setTaskContext(context2)
-  val consumer2 = KafkaDataConsumer.acquire(topicPartition, kafkaParams)
-
-  // any method call which requires consumer is necessary
-  consumer2.getAvailableOffsetRange()
+  val consumer2Underlying = initSingleConsumer(kafkaParams, key)
 
-  val consumer2Underlying = consumer2._consumer
-  assert(consumer2Underlying.isDefined)
   // here we expect different consumer as pool will invalidate for task 
reattempt
   assert(consumer2Underlying.get.ne(consumer1Underlying.get))
+} finally {
+  TaskContext.unset()
+}
+  }
 
-  consumer2.release()
+  test("same KafkaDataConsumer instance in case of same token") {
+try {
+  val kafkaParams = getKafkaParams()
+  val key = new CacheKey(groupId, topicPartition)
 
-  // The first consumer should be removed from cache, but the consumer 
after invalidate
 
 Review comment:
   The comment has been deleted but the functionality has been moved into 
`initSingleConsumer`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available

2019-09-19 Thread GitBox
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] 
Invalidate Kafka consumer when new delegation token available
URL: https://github.com/apache/spark/pull/25760#discussion_r326269926
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ##
 @@ -516,13 +521,41 @@ private[kafka010] class KafkaDataConsumer(
 fetchedData.withNewPoll(records.listIterator, offsetAfterPoll)
   }
 
-  private def getOrRetrieveConsumer(): InternalKafkaConsumer = _consumer match 
{
-case None =>
-  _consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams))
-  require(_consumer.isDefined, "borrowing consumer from pool must always 
succeed.")
-  _consumer.get
+  private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = {
+if (!_consumer.isDefined) {
+  retrieveConsumer()
+}
+ensureConsumerHasLatestToken()
+_consumer.get
+  }
 
-case Some(consumer) => consumer
+  private def retrieveConsumer(): Unit = {
+_consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams))
+require(_consumer.isDefined, "borrowing consumer from pool must always 
succeed.")
+  }
+
+  private def ensureConsumerHasLatestToken(): Unit = {
+require(_consumer.isDefined, "Consumer must be defined")
+val params = _consumer.get.kafkaParamsWithSecurity
+if (params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) {
+  logDebug("Delegation token used by cached consumer, checking if uses the 
latest token.")
+
+  val jaasParams = 
params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String]
+  val clusterConfig = KafkaTokenUtil.findMatchingToken(SparkEnv.get.conf,
 
 Review comment:
   It took a while to recall what was the original intention with 
`KafkaTokenUtil.findMatchingToken` so something can be definitely better. So 
all in all `KafkaTokenUtil.findMatchingToken` finds a token in `UGI` and gives 
back information which belongs to this token, now config + token. Let me check 
how can we make it better...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available

2019-09-19 Thread GitBox
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] 
Invalidate Kafka consumer when new delegation token available
URL: https://github.com/apache/spark/pull/25760#discussion_r326268320
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ##
 @@ -516,13 +521,41 @@ private[kafka010] class KafkaDataConsumer(
 fetchedData.withNewPoll(records.listIterator, offsetAfterPoll)
   }
 
-  private def getOrRetrieveConsumer(): InternalKafkaConsumer = _consumer match 
{
-case None =>
-  _consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams))
-  require(_consumer.isDefined, "borrowing consumer from pool must always 
succeed.")
-  _consumer.get
+  private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = {
+if (!_consumer.isDefined) {
+  retrieveConsumer()
+}
+ensureConsumerHasLatestToken()
 
 Review comment:
   Will consider this with during refactor.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available

2019-09-19 Thread GitBox
gaborgsomogyi commented on a change in pull request #25760: [SPARK-29054][SS] 
Invalidate Kafka consumer when new delegation token available
URL: https://github.com/apache/spark/pull/25760#discussion_r326267775
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ##
 @@ -46,6 +48,9 @@ private[kafka010] class InternalKafkaConsumer(
 
   val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
 
+  // Kafka consumer is not able to give back the params instantiated with so 
we need to store it.
+  // It must be updated all the time when new consumer created.
 
 Review comment:
   Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org