[ https://issues.apache.org/jira/browse/HBASE-20295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461775#comment-16461775 ]
Michael Jin commented on HBASE-20295: ------------------------------------- new issue was created: HBASE-20521 [~mdrob], please correct description if it's not clear > TableOutputFormat.checkOutputSpecs throw NullPointerException Exception > ----------------------------------------------------------------------- > > Key: HBASE-20295 > URL: https://issues.apache.org/jira/browse/HBASE-20295 > Project: HBase > Issue Type: Bug > Components: mapreduce > Affects Versions: 1.4.0 > Environment: Spark 2.2.1, HBase 1.4.0 > Reporter: Michael Jin > Assignee: Michael Jin > Priority: Major > Fix For: 2.0.0 > > Attachments: HBASE-20295.branch-1.4.001.patch, > HBASE-20295.master.001.patch, HBASE-20295.master.002.patch, > HBASE-20295.master.003.patch > > Original Estimate: 168h > Remaining Estimate: 168h > > I am using spark write data to HBase by using RDD. > saveAsNewAPIHadoopDataset function, it works fine with hbase 1.3.1, but when > update my hbase dependency to 1.4.0 in pom.xml, it throw > java.lang.NullPointerException, it is caused by a logic error in > TableOutputFormat.checkOutputSpecs function, please check below details: > first let's take a look at SparkHadoopMapReduceWriter.write function in > SparkHadoopMapReduceWriter.scala > {code:java} > // SparkHadoopMapReduceWriter.write > (org.apache.spark.internal.io.SparkHadoopMapReduceWriter.scala) > def write[K, V: ClassTag]( > rdd: RDD[(K, V)], > hadoopConf: Configuration): Unit = { > // Extract context and configuration from RDD. > val sparkContext = rdd.context > val stageId = rdd.id > val sparkConf = rdd.conf > val conf = new SerializableConfiguration(hadoopConf) > // Set up a job. > val jobTrackerId = SparkHadoopWriterUtils.createJobTrackerID(new Date()) > val jobAttemptId = new TaskAttemptID(jobTrackerId, stageId, TaskType.MAP, > 0, 0) > val jobContext = new TaskAttemptContextImpl(conf.value, jobAttemptId) > val format = jobContext.getOutputFormatClass > if (SparkHadoopWriterUtils.isOutputSpecValidationEnabled(sparkConf)) { > // FileOutputFormat ignores the filesystem parameter > val jobFormat = format.newInstance > jobFormat.checkOutputSpecs(jobContext) > } > val committer = FileCommitProtocol.instantiate( > className = classOf[HadoopMapReduceCommitProtocol].getName, > jobId = stageId.toString, > outputPath = > conf.value.get("mapreduce.output.fileoutputformat.outputdir"), > isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] > committer.setupJob(jobContext) > ...{code} > in "write" function if output spec validation is enabled, it will call > checkOutputSpec function in TableOutputFormat class, but the job format is > simply created by "vall jobFormat = format.newInstance", this will NOT > initialize "conf" member variable in TableOutputFormat class, let's continue > check checkOutputSpecs function in TableOutputFormat class > > {code:java} > // TableOutputFormat.checkOutputSpecs > (org.apache.hadoop.hbase.mapreduce.TableOutputFormat.java) HBASE 1.4.0 > @Override > public void checkOutputSpecs(JobContext context) throws IOException, > InterruptedException { > try (Admin admin = > ConnectionFactory.createConnection(getConf()).getAdmin()) { > TableName tableName = TableName.valueOf(this.conf.get(OUTPUT_TABLE)); > if (!admin.tableExists(tableName)) { > throw new TableNotFoundException("Can't write, table does not exist:" + > tableName.getNameAsString()); > } > if (!admin.isTableEnabled(tableName)) { > throw new TableNotEnabledException("Can't write, table is not enabled: > " + > tableName.getNameAsString()); > } > } > } > {code} > > "ConnectionFactory.createConnection(getConf())", as mentioned above "conf" > class member is not initialized, so getConf() will return null, so in the > next UserProvider create instance process, it throw the > NullPointException(Please part of stack trace at the end), it is a little > confused that, context passed by function parameter is actually been properly > constructed, and it contains Configuration object, why context is never used? > So I suggest to use below code to partly fix this issue: > > {code:java} > // code placeholder > @Override > public void checkOutputSpecs(JobContext context) throws IOException, > InterruptedException { > Configuration hConf = context.getConfiguration(); > if(hConf == null) > hConf = this.conf; > try (Admin admin = ConnectionFactory.createConnection(hConf).getAdmin()) { > TableName tableName = TableName.valueOf(hConf.get(OUTPUT_TABLE)); > if (!admin.tableExists(tableName)) { > throw new TableNotFoundException("Can't write, table does not exist:" + > tableName.getNameAsString()); > } > if (!admin.isTableEnabled(tableName)) { > throw new TableNotEnabledException("Can't write, table is not enabled: > " + > tableName.getNameAsString()); > } > } > } > {code} > In hbase 1.3.1, this issue is not exists because checkOutputSpecs has a blank > function body > > > Part of stack trace: > Exception in thread "main" java.lang.NullPointerException > at > org.apache.hadoop.hbase.security.UserProvider.instantiate(UserProvider.java:122) > at > org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:214) > at > org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119) > at > org.apache.hadoop.hbase.mapreduce.TableOutputFormat.checkOutputSpecs(TableOutputFormat.java:177) > at > org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:76) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > at > org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084) > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)