Hi,

I am trying to load files in Apache Phoenix using HFiles. I do not have a
csv so I need to load the Hfiles from a RDD.

My problem is that I am not able to see the files using the apache api
(select * from...) but when I do a scan of the table I see the files.

Do I need to include the empty column?

This is the code I am using:

class ExtendedProductRDDFunctions[A <: scala.Product](data:
org.apache.spark.rdd.RDD[A]) extends
  ProductRDDFunctions[A](data) with Serializable with Logging {

Create the Hfiles:
-------------------
  def toHFile(
              sc: SparkContext,
              tableName: String,
              columns: Seq[String],
              conf: Configuration = new Configuration,
              zkUrl: Option[String] = None
              ): RDD[(ByteArrayWrapper, FamiliesQualifiersValues)] = {

    val config = ConfigurationUtil.getOutputConfiguration(tableName,
columns, zkUrl, Some(conf))
    val tableBytes = Bytes.toBytes(tableName)
    ConfigurationUtil.encodeColumns(config)
    val jdbcUrl = zkUrl.map(getJdbcUrl).getOrElse(getJdbcUrl(config))
    val query = QueryUtil.constructUpsertStatement(tableName,
columns.toList.asJava, null)

    val columnsInfo = ConfigurationUtil.decodeColumns(config)
    val a = sc.broadcast(columnsInfo)

    logInfo("toHFile data size: "+data.count())
    data.flatMap(x => mapRow(x, jdbcUrl, tableBytes, query, a.value))
  }

  def mapRow(product: Product,
             jdbcUrl: String,
             tableBytes: Array[Byte],
             query: String,
             columnsInfo: List[ColumnInfo]): List[(ByteArrayWrapper,
FamiliesQualifiersValues)] = {

    val conn = DriverManager.getConnection(jdbcUrl)
    var hRows:Iterator[(ByteArrayWrapper, FamiliesQualifiersValues)] = null
    val preparedStatement = conn.prepareStatement(query)


columnsInfo.zip(product.productIterator.toList).zipWithIndex.foreach(setInStatement(preparedStatement))
    preparedStatement.execute()

    val uncommittedDataIterator =
PhoenixRuntime.getUncommittedDataIterator(conn, true)
    hRows = uncommittedDataIterator.asScala
      .flatMap(kvPair => kvPair.getSecond.asScala.map(kf => createPut(kf)))

    conn.rollback()
    hRows.toList
  }

  private def createPut(keyValue:
KeyValue):(ByteArrayWrapper,FamiliesQualifiersValues)={

    val key = new ByteArrayWrapper(keyValue.getRow)
    val family = new FamiliesQualifiersValues

    family.+=(keyValue.getFamily,keyValue.getQualifier,keyValue.getValue)
    (key,family)
  }
  }

  Load into Apache Phoenix
  -------------------------
  val sortedRdd = rdd
          .keyBy(k => k._1.toString)
          .reduceByKey((key,value) => value)
          .map(v => v._2)

  def apacheBulkSave(hBaseContext: HBaseContext, table: String,outputPath:
String) ={
    rdd.hbaseBulkLoadThinRows(hBaseContext,
      TableName.valueOf(table),
      f => f,
      outputPath
    )
  }

-- 
Un saludo - Best Regards.
Abel

Reply via email to