This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new dfed439 [SPARK-26432][CORE] Obtain HBase delegation token operation compatible with HBase 2.x.x version API dfed439 is described below commit dfed439e33b7bf224dd412b0960402068d961c7b Author: s71955 <sujithchacko.2...@gmail.com> AuthorDate: Mon Jan 28 10:08:23 2019 -0800 [SPARK-26432][CORE] Obtain HBase delegation token operation compatible with HBase 2.x.x version API ## What changes were proposed in this pull request? While obtaining token from hbase service , spark uses deprecated API of hbase , ```public static Token<AuthenticationTokenIdentifier> obtainToken(Configuration conf)``` This deprecated API is already been removed from hbase 2.x version as part of the hbase 2.x major release. https://issues.apache.org/jira/browse/HBASE-14713_ there is one more stable API in ```public static Token<AuthenticationTokenIdentifier> obtainToken(Connection conn)``` in TokenUtil class spark shall use this stable api for getting the delegation token. To invoke this api first connection object has to be retrieved from ConnectionFactory and the same connection can be passed to obtainToken(Connection conn) for getting token. eg: Call ```public static Connection createConnection(Configuration conf)``` , then call ```public static Token<AuthenticationTokenIdentifier> obtainToken( Connection conn)```. ## How was this patch tested? Manual testing is been done. Manual test result: Before fix: ![hbase-dep-obtaintok 1](https://user-images.githubusercontent.com/12999161/50699264-64cac200-106d-11e9-81b4-e50ae8097f27.png) After fix: 1. Create 2 tables in hbase shell >Launch hbase shell >Enter commands to create tables and load data create 'table1','cf' put 'table1','row1','cf:cid','20' create 'table2','cf' put 'table2','row1','cf:cid','30' >Show values command get 'table1','row1','cf:cid' will diplay value as 20 get 'table2','row1','cf:cid' will diplay value as 30 2.Run SparkHbasetoHbase class in testSpark.jar using spark-submit spark-submit --master yarn-cluster --class com.mrs.example.spark.SparkHbasetoHbase --conf "spark.yarn.security.credentials.hbase.enabled"="true" --conf "spark.security.credentials.hbase.enabled"="true" --keytab /opt/client/user.keytab --principal sen testSpark.jar The SparkHbasetoHbase test class will update the value of table2 with sum of values of table1 & table2. table2 = table1+table2 As we can see in the snapshot the spark job has been successfully able to interact with hbase service and able to update the row count. ![obtaintok_success 1](https://user-images.githubusercontent.com/12999161/50699393-bd9a5a80-106d-11e9-96c6-6c250d561efa.png) Closes #23429 from sujith71955/master_hbase_service. Authored-by: s71955 <sujithchacko.2...@gmail.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> --- .../security/HBaseDelegationTokenProvider.scala | 56 ++++++++++++++++++++-- 1 file changed, 52 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala index 3bf8c14..e345b0b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala @@ -17,6 +17,8 @@ package org.apache.spark.deploy.security +import java.io.Closeable + import scala.reflect.runtime.universe import scala.util.control.NonFatal @@ -42,8 +44,8 @@ private[security] class HBaseDelegationTokenProvider try { val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) val obtainToken = mirror.classLoader. - loadClass("org.apache.hadoop.hbase.security.token.TokenUtil"). - getMethod("obtainToken", classOf[Configuration]) + loadClass("org.apache.hadoop.hbase.security.token.TokenUtil") + .getMethod("obtainToken", classOf[Configuration]) logDebug("Attempting to fetch HBase security token.") val token = obtainToken.invoke(null, hbaseConf(hadoopConf)) @@ -52,12 +54,58 @@ private[security] class HBaseDelegationTokenProvider creds.addToken(token.getService, token) } catch { case NonFatal(e) => - logWarning(s"Failed to get token from service $serviceName", e) + logWarning(s"Failed to get token from service $serviceName due to " + e + + s" Retrying to fetch HBase security token with hbase connection parameter.") + // Seems to be spark is trying to get the token from HBase 2.x.x version or above where the + // obtainToken(Configuration conf) API has been removed. Lets try obtaining the token from + // another compatible API of HBase service. + obtainDelegationTokensWithHBaseConn(hadoopConf, creds) } - None } + /** + * Token<AuthenticationTokenIdentifier> obtainToken(Configuration conf) is a deprecated + * method and in Hbase 2.0.0 the method is already removed. + * The HBase client API used in below method is introduced from HBase 0.98.9 version, + * to invoke this api first connection object has to be retrieved from ConnectionFactory and the + * same connection can be passed to + * Token<AuthenticationTokenIdentifier> obtainToken(Connection conn) API + * + * @param hadoopConf Configuration of current Hadoop Compatible system. + * @param creds Credentials to add tokens and security keys to. + */ + private def obtainDelegationTokensWithHBaseConn( + hadoopConf: Configuration, + creds: Credentials): Unit = { + var hbaseConnection : Closeable = null + try { + val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) + val connectionFactoryClass = mirror.classLoader + .loadClass("org.apache.hadoop.hbase.client.ConnectionFactory") + .getMethod("createConnection", classOf[Configuration]) + hbaseConnection = connectionFactoryClass.invoke(null, hbaseConf(hadoopConf)) + .asInstanceOf[Closeable] + val connectionParamTypeClassRef = mirror.classLoader + .loadClass("org.apache.hadoop.hbase.client.Connection") + val obtainTokenMethod = mirror.classLoader + .loadClass("org.apache.hadoop.hbase.security.token.TokenUtil") + .getMethod("obtainToken", connectionParamTypeClassRef) + logDebug("Attempting to fetch HBase security token.") + val token = obtainTokenMethod.invoke(null, hbaseConnection) + .asInstanceOf[Token[_ <: TokenIdentifier]] + logInfo(s"Get token from HBase: ${token.toString}") + creds.addToken(token.getService, token) + } catch { + case NonFatal(e) => + logWarning(s"Failed to get token from service $serviceName", e) + } finally { + if (null != hbaseConnection) { + hbaseConnection.close() + } + } + } + override def delegationTokensRequired( sparkConf: SparkConf, hadoopConf: Configuration): Boolean = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org