Hi,

I have trade data stored in Hbase table. Data arrives in csv format to HDFS
and then loaded into Hbase via periodic load with
org.apache.hadoop.hbase.mapreduce.ImportTsv.

The Hbase table has one Column family "trade_info" and three columns:
ticker, timecreated, price.

The RowKey is UUID. So each row has UUID, ticker, timecreated and price in
the csv file

Each row in Hbase is a key, value map. In my case, I have one Column Family
and three columns. Without going into semantics I see Hbase as a column
oriented database where column data stay together.

So I thought of this way of accessing the data.

I define an RDD for each column in the column family as below. In this case
column trade_info:ticker

//create rdd
val hBaseRDD = sc.newAPIHadoopRDD(conf,
classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
val rdd1 = hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow,
result.getColumn("price_info".getBytes(), "ticker".getBytes()))).map(row =>
{
(
  row._1.map(_.toChar).mkString,
  row._2.asScala.reduceLeft {
    (a, b) => if (a.getTimestamp > b.getTimestamp) a else b
  }.getValue.map(_.toChar).mkString
)
})
case class columns (key: String, ticker: String)
val dfticker = rdd1.toDF.map(p => columns(p(0).toString,p(1).toString))

Note that the end result is a DataFrame with the RowKey -> key and column
-> ticker

I use the same approach to create two other DataFrames, namely dftimecreated
and dfprice for the two other columns.

Note that if I don't need a column, then I do not create a DF for it. So a
DF with each column I use. I am not sure how this compares if I read the
full row through other methods if any.

Anyway all I need to do after creating a DataFrame for each column is to
join themthrough RowKey to slice and dice data. Like below.

Get me the latest prices ordered by timecreated and ticker (ticker is stock)

val rs =
dfticker.join(dftimecreated,"key").join(dfprice,"key").orderBy('timecreated
desc, 'price desc).select('timecreated, 'ticker,
'price.cast("Float").as("Latest price"))
rs.show(10)

+-------------------+------+------------+
|        timecreated|ticker|Latest price|
+-------------------+------+------------+
|2016-10-16T18:44:57|   S16|   97.631966|
|2016-10-16T18:44:57|   S13|    92.11406|
|2016-10-16T18:44:57|   S19|    85.93021|
|2016-10-16T18:44:57|   S09|   85.714645|
|2016-10-16T18:44:57|   S15|    82.38932|
|2016-10-16T18:44:57|   S17|    80.77747|
|2016-10-16T18:44:57|   S06|    79.81854|
|2016-10-16T18:44:57|   S18|    74.10128|
|2016-10-16T18:44:57|   S07|    66.13622|
|2016-10-16T18:44:57|   S20|    60.35727|
+-------------------+------+------------+
only showing top 10 rows

Is this a workable solution?

Thanks



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Reply via email to