Hi,
I am trying to load files in Apache Phoenix using HFiles. I do not have a
csv so I need to load the Hfiles from a RDD.
My problem is that I am not able to see the files using the apache api
(select * from...) but when I do a scan of the table I see the files.
Do I need to include the empty column?
This is the code I am using:
class ExtendedProductRDDFunctions[A <: scala.Product](data:
org.apache.spark.rdd.RDD[A]) extends
ProductRDDFunctions[A](data) with Serializable with Logging {
Create the Hfiles:
-------------------
def toHFile(
sc: SparkContext,
tableName: String,
columns: Seq[String],
conf: Configuration = new Configuration,
zkUrl: Option[String] = None
): RDD[(ByteArrayWrapper, FamiliesQualifiersValues)] = {
val config = ConfigurationUtil.getOutputConfiguration(tableName,
columns, zkUrl, Some(conf))
val tableBytes = Bytes.toBytes(tableName)
ConfigurationUtil.encodeColumns(config)
val jdbcUrl = zkUrl.map(getJdbcUrl).getOrElse(getJdbcUrl(config))
val query = QueryUtil.constructUpsertStatement(tableName,
columns.toList.asJava, null)
val columnsInfo = ConfigurationUtil.decodeColumns(config)
val a = sc.broadcast(columnsInfo)
logInfo("toHFile data size: "+data.count())
data.flatMap(x => mapRow(x, jdbcUrl, tableBytes, query, a.value))
}
def mapRow(product: Product,
jdbcUrl: String,
tableBytes: Array[Byte],
query: String,
columnsInfo: List[ColumnInfo]): List[(ByteArrayWrapper,
FamiliesQualifiersValues)] = {
val conn = DriverManager.getConnection(jdbcUrl)
var hRows:Iterator[(ByteArrayWrapper, FamiliesQualifiersValues)] = null
val preparedStatement = conn.prepareStatement(query)
columnsInfo.zip(product.productIterator.toList).zipWithIndex.foreach(setInStatement(preparedStatement))
preparedStatement.execute()
val uncommittedDataIterator =
PhoenixRuntime.getUncommittedDataIterator(conn, true)
hRows = uncommittedDataIterator.asScala
.flatMap(kvPair => kvPair.getSecond.asScala.map(kf => createPut(kf)))
conn.rollback()
hRows.toList
}
private def createPut(keyValue:
KeyValue):(ByteArrayWrapper,FamiliesQualifiersValues)={
val key = new ByteArrayWrapper(keyValue.getRow)
val family = new FamiliesQualifiersValues
family.+=(keyValue.getFamily,keyValue.getQualifier,keyValue.getValue)
(key,family)
}
}
Load into Apache Phoenix
-------------------------
val sortedRdd = rdd
.keyBy(k => k._1.toString)
.reduceByKey((key,value) => value)
.map(v => v._2)
def apacheBulkSave(hBaseContext: HBaseContext, table: String,outputPath:
String) ={
rdd.hbaseBulkLoadThinRows(hBaseContext,
TableName.valueOf(table),
f => f,
outputPath
)
}
--
Un saludo - Best Regards.
Abel