Hi, I am trying to load files in Apache Phoenix using HFiles. I do not have a csv so I need to load the Hfiles from a RDD.
My problem is that I am not able to see the files using the apache api (select * from...) but when I do a scan of the table I see the files. Do I need to include the empty column? This is the code I am using: class ExtendedProductRDDFunctions[A <: scala.Product](data: org.apache.spark.rdd.RDD[A]) extends ProductRDDFunctions[A](data) with Serializable with Logging { Create the Hfiles: ------------------- def toHFile( sc: SparkContext, tableName: String, columns: Seq[String], conf: Configuration = new Configuration, zkUrl: Option[String] = None ): RDD[(ByteArrayWrapper, FamiliesQualifiersValues)] = { val config = ConfigurationUtil.getOutputConfiguration(tableName, columns, zkUrl, Some(conf)) val tableBytes = Bytes.toBytes(tableName) ConfigurationUtil.encodeColumns(config) val jdbcUrl = zkUrl.map(getJdbcUrl).getOrElse(getJdbcUrl(config)) val query = QueryUtil.constructUpsertStatement(tableName, columns.toList.asJava, null) val columnsInfo = ConfigurationUtil.decodeColumns(config) val a = sc.broadcast(columnsInfo) logInfo("toHFile data size: "+data.count()) data.flatMap(x => mapRow(x, jdbcUrl, tableBytes, query, a.value)) } def mapRow(product: Product, jdbcUrl: String, tableBytes: Array[Byte], query: String, columnsInfo: List[ColumnInfo]): List[(ByteArrayWrapper, FamiliesQualifiersValues)] = { val conn = DriverManager.getConnection(jdbcUrl) var hRows:Iterator[(ByteArrayWrapper, FamiliesQualifiersValues)] = null val preparedStatement = conn.prepareStatement(query) columnsInfo.zip(product.productIterator.toList).zipWithIndex.foreach(setInStatement(preparedStatement)) preparedStatement.execute() val uncommittedDataIterator = PhoenixRuntime.getUncommittedDataIterator(conn, true) hRows = uncommittedDataIterator.asScala .flatMap(kvPair => kvPair.getSecond.asScala.map(kf => createPut(kf))) conn.rollback() hRows.toList } private def createPut(keyValue: KeyValue):(ByteArrayWrapper,FamiliesQualifiersValues)={ val key = new ByteArrayWrapper(keyValue.getRow) val family = new FamiliesQualifiersValues family.+=(keyValue.getFamily,keyValue.getQualifier,keyValue.getValue) (key,family) } } Load into Apache Phoenix ------------------------- val sortedRdd = rdd .keyBy(k => k._1.toString) .reduceByKey((key,value) => value) .map(v => v._2) def apacheBulkSave(hBaseContext: HBaseContext, table: String,outputPath: String) ={ rdd.hbaseBulkLoadThinRows(hBaseContext, TableName.valueOf(table), f => f, outputPath ) } -- Un saludo - Best Regards. Abel