Hi, Over on HBase we are working on direct integration with spark. That may (or may not) be a better option when you already have code using the HBase API directly. Please see https://issues.apache.org/jira/browse/HBASE-13992
On Saturday, July 18, 2015, Josh Mahonin <[email protected]> wrote: > Hi, > > The phoenix-spark integration is a thin wrapper around the > phoenix-mapreduce integration, which under the hood just uses Phoenix's > 'UPSERT' functionality for saving. As far as I know, there's no provisions > for checkAndPut functionality there, so if you require it, I suggest > sticking to the HBase API for now. > > Re: timestamp, the phoenix-spark API only supports using the native > Phoenix time formats, i.e. DATE, TIMESTAMP. > > Good luck, > > Josh > > On Fri, Jul 17, 2015 at 11:40 PM, kaye ann <[email protected] > <javascript:_e(%7B%7D,'cvml','[email protected]');>> wrote: > >> I am using Spark 1.3, HBase 1.1 and Phoenix 4.4. I have this in my code: >> >> val rdd = processedRdd.map(r => Row.fromSeq(r)) >> val dataframe = sqlContext.createDataFrame(rdd, schema) >> dataframe.save("org.apache.phoenix.spark", SaveMode.Overwrite, >> Map("table" -> HTABLE, "zkUrl" -> zkQuorum)) >> >> This code works, but... >> 1. How do I implement HBase's checkAndPut using Phoenix-Spark API? >> CREATED_DATE is always set to DateTime.now() in the dataframe. >> I don't want the field to be updated if the row already exists in HBase, >> yet there's an update in other fields. >> I can achieve it using HBase's checkAndPut: Put all the fields and use >> checkAndPut on created_date field. >> 2. How do I add an HBase Timestamp using Phoenix-Spark similiar to HBase >> API: >> Put(rowkey, timestamp.getMillis) >> ----------------- >> This is my code using HBase API that I am trying to convert to >> Phoenix-Spark since I think Phoenix-Spark is more optimized: >> >> rdd.foreachPartition(p => { >> val conf = HBaseConfiguration.create() >> val hTable = new HTable(conf, HTABLE) >> hTable.setAutoFlushTo(false) >> >> p.foreach(r => { >> val hTimestamp = ... >> val rowkey = ... >> >> val hRow = new Put(rowkey, hTimestamp.getMillis) >> r.filter(...).foreach(tuple => >> hRow.add(toBytes(tuple._1), toBytes(tuple._2), toBytes(tuple._3)) >> ) >> hTable.put(hRow) >> >> val CREATED_DATE_PUT = new Put(rowkey, hTimestamp.getMillis) >> .add(toBytes(CF), toBytes(CREATED_DATE), toBytes(now)) >> hTable.checkAndPut(rowkey, toBytes(CF), toBytes(CREATED_DATE), null, >> CREATED_DATE_PUT) >> >> }) >> hTable.flushCommits() >> hTable.close() >> }) >> >> > -- Best regards, - Andy Problems worthy of attack prove their worth by hitting back. - Piet Hein (via Tom White)
