Michael Jin created HBASE-20295:
-----------------------------------

             Summary: TableOutputFormat.checkOutputSpecs throw 
NullPointerException Exception
                 Key: HBASE-20295
                 URL: https://issues.apache.org/jira/browse/HBASE-20295
             Project: HBase
          Issue Type: Bug
          Components: Client
    Affects Versions: 1.4.0
         Environment: Spark 2.2.1, HBase 1.4.0
            Reporter: Michael Jin


I am using spark write data to HBase by using RDD.

saveAsNewAPIHadoopDataset function, it works fine with hbase 1.3.1, but when 
update my hbase dependency to 1.4.0 in pom.xml, it throw 
java.lang.NullPointerException, it is caused by a logic error in 
TableOutputFormat.checkOutputSpecs function, please check below details:

first let's take a look at SparkHadoopMapReduceWriter.write function in 
SparkHadoopMapReduceWriter.scala
{code:java}
// SparkHadoopMapReduceWriter.write 
(org.apache.spark.internal.io.SparkHadoopMapReduceWriter.scala)

def write[K, V: ClassTag](
    rdd: RDD[(K, V)],
    hadoopConf: Configuration): Unit = {
  // Extract context and configuration from RDD.
  val sparkContext = rdd.context
  val stageId = rdd.id
  val sparkConf = rdd.conf
  val conf = new SerializableConfiguration(hadoopConf)

  // Set up a job.
  val jobTrackerId = SparkHadoopWriterUtils.createJobTrackerID(new Date())
  val jobAttemptId = new TaskAttemptID(jobTrackerId, stageId, TaskType.MAP, 0, 
0)
  val jobContext = new TaskAttemptContextImpl(conf.value, jobAttemptId)
  val format = jobContext.getOutputFormatClass

  if (SparkHadoopWriterUtils.isOutputSpecValidationEnabled(sparkConf)) {
    // FileOutputFormat ignores the filesystem parameter
    val jobFormat = format.newInstance
    jobFormat.checkOutputSpecs(jobContext)
  }

  val committer = FileCommitProtocol.instantiate(
    className = classOf[HadoopMapReduceCommitProtocol].getName,
    jobId = stageId.toString,
    outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
    isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
  committer.setupJob(jobContext)
...{code}
in "write" function if output spec validation is enabled, it will call 
checkOutputSpec function in TableOutputFormat class, but the job format is 
simply created by "vall jobFormat = format.newInstance", this will NOT 
initialize "conf" member variable in TableOutputFormat class, let's continue 
check checkOutputSpecs function in TableOutputFormat class

 
{code:java}
// TableOutputFormat.checkOutputSpecs 
(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.java) HBASE 1.4.0
@Override
public void checkOutputSpecs(JobContext context) throws IOException,
    InterruptedException {

  try (Admin admin = ConnectionFactory.createConnection(getConf()).getAdmin()) {
    TableName tableName = TableName.valueOf(this.conf.get(OUTPUT_TABLE));
    if (!admin.tableExists(tableName)) {
      throw new TableNotFoundException("Can't write, table does not exist:" +
          tableName.getNameAsString());
    }

    if (!admin.isTableEnabled(tableName)) {
      throw new TableNotEnabledException("Can't write, table is not enabled: " +
          tableName.getNameAsString());
    }
  }
}
{code}
 

"ConnectionFactory.createConnection(getConf())", as mentioned above "conf" 
class member is not initialized, so getConf() will return null, so in the next 
UserProvider create instance process, it throw the NullPointException(Please 
part of stack trace at the end), it is a little confused that, context passed 
by function parameter is actually been properly constructed, and it contains 
Configuration object, why context is never used? So I suggest to use below code 
to partly fix this issue:

 
{code:java}
// code placeholder
@Override
public void checkOutputSpecs(JobContext context) throws IOException,
    InterruptedException {
  Configuration hConf = context.getConfiguration();
  if(hConf == null)
    hConf = this.conf;

  try (Admin admin = ConnectionFactory.createConnection(hConf).getAdmin()) {
    TableName tableName = TableName.valueOf(hConf.get(OUTPUT_TABLE));
    if (!admin.tableExists(tableName)) {
      throw new TableNotFoundException("Can't write, table does not exist:" +
              tableName.getNameAsString());
    }

    if (!admin.isTableEnabled(tableName)) {
      throw new TableNotEnabledException("Can't write, table is not enabled: " +
              tableName.getNameAsString());
    }
  }
}
{code}
In hbase 1.3.1, this issue is not exists because checkOutputSpecs has a blank 
function body

 

 

Part of stack trace:

Exception in thread "main" java.lang.NullPointerException
 at 
org.apache.hadoop.hbase.security.UserProvider.instantiate(UserProvider.java:122)
 at 
org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:214)
 at 
org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119)
 at 
org.apache.hadoop.hbase.mapreduce.TableOutputFormat.checkOutputSpecs(TableOutputFormat.java:177)
 at 
org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:76)
 at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
 at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
 at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
 at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
 at 
org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to