DF.toJavaRDD.rdd.hbaseBulkLoadThinRows(hbaseContext,
TableName.valueOf(config.getString("table")), R => {
val rowKey = Bytes.toBytes(R.getAs[String](name))
val family = Bytes.toBytes(_family)
val qualifier = Bytes.toBytes(name)
var value: Array[Byte] = value = Bytes.toBytes(R.getAs[String](name))
familyQualifiersValues += (family, qualifier, value)
}
}
(new ByteArrayWrapper(rowKey), familyQualifiersValues)
}, config.getString("tmp"))
val table =
connection.getTable(TableName.valueOf(config.getString("table")))
val load = new LoadIncrementalHFiles(conf)
load.doBulkLoad(new Path(config.getString("tmp")),
connection.getAdmin, table,
connection.getRegionLocator(TableName.valueOf(config.getString("table"))))
}
I get a error
21/08/19 15:12:22 INFO LoadIncrementalHFiles: Split occurred while grouping
HFiles, retry attempt 9 with 1 files remaining to group or split
21/08/19 15:12:22 INFO LoadIncrementalHFiles: Trying to load
hfile=file:/d:/tmp/f/bb4706276d5d40c5b3014cc74dc39ddd first=Optional[0001]
last=Optional[0003]
21/08/19 15:12:22 WARN LoadIncrementalHFiles: Attempt to bulk load region
containing into table sparktest1 with files [family:f
path:file:/d:/tmp/f/bb4706276d5d40c5b3014cc74dc39ddd] failed. This is
recoverable and they will be retried.
21/08/19 15:12:22 INFO LoadIncrementalHFiles: Split occurred while grouping
HFiles, retry attempt 10 with 1 files remaining to group or split
21/08/19 15:12:22 ERROR LoadIncrementalHFiles:
-------------------------------------------------
Bulk load aborted with some files not yet loaded:
-------------------------------------------------
file:/d:/tmp/f/bb4706276d5d40c5b3014cc74dc39ddd
Exception in thread "main" java.io.IOException: Retry attempted 10 times
without completing, bailing out
at
org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.performBulkLoad(LoadIncrementalHFiles.java:419)
at
org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.doBulkLoad(LoadIncrementalHFiles.java:342)
at
org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.doBulkLoad(LoadIncrementalHFiles.java:256)
at com.join.hbase.writer.HbaseWriter.saveTo(HbaseWriter.scala:167)
at com.join.Synctool$.main(Synctool.scala:587)
at com.join.Synctool.main(Synctool.scala)
file:/d:/tmp/f/bb4706276d5d40c5b3014cc74dc39ddd is existent
os hbaseBulkLoadThinRows function is OK
in official web I find
rdd.hbaseBulkLoad(TableName.valueOf(tableName),
t => {
val rowKey = t._1
val family:Array[Byte] = t._2(0)._1
val qualifier = t._2(0)._2
val value = t._2(0)._3
val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
Seq((keyFamilyQualifier, value)).iterator
},
stagingFolder.getPath)
val load = new LoadIncrementalHFiles(config)
load.doBulkLoad(new Path(stagingFolder.getPath),
conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
path of hbaseBulkLoad and LoadIncrementalHFiles is the same
stagingFolder.getPath
and I hbaseBulkLoad expected local file
igyu