Hi Abel, Yes, you need to either include the empty key value or you need to declare your table as a view instead of a table (in which case it'd be read-only). Thanks, James
On Wed, Apr 27, 2016 at 12:17 PM, Abel Fernández <mevsmys...@gmail.com> wrote: > Hi, > > I am trying to load files in Apache Phoenix using HFiles. I do not have a > csv so I need to load the Hfiles from a RDD. > > My problem is that I am not able to see the files using the apache api > (select * from...) but when I do a scan of the table I see the files. > > Do I need to include the empty column? > > This is the code I am using: > > class ExtendedProductRDDFunctions[A <: scala.Product](data: > org.apache.spark.rdd.RDD[A]) extends > ProductRDDFunctions[A](data) with Serializable with Logging { > > Create the Hfiles: > ------------------- > def toHFile( > sc: SparkContext, > tableName: String, > columns: Seq[String], > conf: Configuration = new Configuration, > zkUrl: Option[String] = None > ): RDD[(ByteArrayWrapper, FamiliesQualifiersValues)] = { > > val config = ConfigurationUtil.getOutputConfiguration(tableName, > columns, zkUrl, Some(conf)) > val tableBytes = Bytes.toBytes(tableName) > ConfigurationUtil.encodeColumns(config) > val jdbcUrl = zkUrl.map(getJdbcUrl).getOrElse(getJdbcUrl(config)) > val query = QueryUtil.constructUpsertStatement(tableName, > columns.toList.asJava, null) > > val columnsInfo = ConfigurationUtil.decodeColumns(config) > val a = sc.broadcast(columnsInfo) > > logInfo("toHFile data size: "+data.count()) > data.flatMap(x => mapRow(x, jdbcUrl, tableBytes, query, a.value)) > } > > def mapRow(product: Product, > jdbcUrl: String, > tableBytes: Array[Byte], > query: String, > columnsInfo: List[ColumnInfo]): List[(ByteArrayWrapper, > FamiliesQualifiersValues)] = { > > val conn = DriverManager.getConnection(jdbcUrl) > var hRows:Iterator[(ByteArrayWrapper, FamiliesQualifiersValues)] = null > val preparedStatement = conn.prepareStatement(query) > > > columnsInfo.zip(product.productIterator.toList).zipWithIndex.foreach(setInStatement(preparedStatement)) > preparedStatement.execute() > > val uncommittedDataIterator = > PhoenixRuntime.getUncommittedDataIterator(conn, true) > hRows = uncommittedDataIterator.asScala > .flatMap(kvPair => kvPair.getSecond.asScala.map(kf => createPut(kf))) > > conn.rollback() > hRows.toList > } > > private def createPut(keyValue: > KeyValue):(ByteArrayWrapper,FamiliesQualifiersValues)={ > > val key = new ByteArrayWrapper(keyValue.getRow) > val family = new FamiliesQualifiersValues > > family.+=(keyValue.getFamily,keyValue.getQualifier,keyValue.getValue) > (key,family) > } > } > > Load into Apache Phoenix > ------------------------- > val sortedRdd = rdd > .keyBy(k => k._1.toString) > .reduceByKey((key,value) => value) > .map(v => v._2) > > def apacheBulkSave(hBaseContext: HBaseContext, table: String,outputPath: > String) ={ > rdd.hbaseBulkLoadThinRows(hBaseContext, > TableName.valueOf(table), > f => f, > outputPath > ) > } > > -- > Un saludo - Best Regards. > Abel >