Re: HBase's checkAndPut, Timestamp in Phoenix-Spark API

2015-07-19 Thread Andrew Purtell
Hi,

Over on HBase we are working on direct integration with spark. That may (or
may not) be a better option when you already have code using the HBase API
directly. Please see https://issues.apache.org/jira/browse/HBASE-13992


On Saturday, July 18, 2015, Josh Mahonin  wrote:

> Hi,
>
> The phoenix-spark integration is a thin wrapper around the
> phoenix-mapreduce integration, which under the hood just uses Phoenix's
> 'UPSERT' functionality for saving. As far as I know, there's no provisions
> for checkAndPut functionality there, so if you require it, I suggest
> sticking to the HBase API for now.
>
> Re: timestamp, the phoenix-spark API only supports using the native
> Phoenix time formats, i.e. DATE, TIMESTAMP.
>
> Good luck,
>
> Josh
>
> On Fri, Jul 17, 2015 at 11:40 PM, kaye ann  > wrote:
>
>> I am using Spark 1.3, HBase 1.1 and Phoenix 4.4. I have this in my code:
>>
>> val rdd = processedRdd.map(r => Row.fromSeq(r))
>> val dataframe = sqlContext.createDataFrame(rdd, schema)
>> dataframe.save("org.apache.phoenix.spark", SaveMode.Overwrite,
>> Map("table" -> HTABLE, "zkUrl" -> zkQuorum))
>>
>> This code works, but...
>> 1. How do I implement HBase's checkAndPut using Phoenix-Spark API?
>> CREATED_DATE is always set to DateTime.now() in the dataframe.
>> I don't want the field to be updated if the row already exists in HBase,
>> yet there's an update in other fields.
>> I can achieve it using HBase's checkAndPut: Put all the fields and use
>> checkAndPut on created_date field.
>> 2. How do I add an HBase Timestamp using Phoenix-Spark similiar to HBase
>> API:
>> Put(rowkey, timestamp.getMillis)
>> -
>> This is my code using HBase API that I am trying to convert to
>> Phoenix-Spark since I think Phoenix-Spark is more optimized:
>>
>> rdd.foreachPartition(p => {
>>   val conf = HBaseConfiguration.create()
>>   val hTable = new HTable(conf, HTABLE)
>>   hTable.setAutoFlushTo(false)
>>
>>   p.foreach(r => {
>> val hTimestamp = ...
>> val rowkey = ...
>>
>> val hRow = new Put(rowkey, hTimestamp.getMillis)
>> r.filter(...).foreach(tuple =>
>>   hRow.add(toBytes(tuple._1), toBytes(tuple._2), toBytes(tuple._3))
>> )
>> hTable.put(hRow)
>>
>> val CREATED_DATE_PUT = new Put(rowkey, hTimestamp.getMillis)
>>   .add(toBytes(CF), toBytes(CREATED_DATE), toBytes(now))
>> hTable.checkAndPut(rowkey, toBytes(CF), toBytes(CREATED_DATE), null, 
>> CREATED_DATE_PUT)
>>
>>   })
>>   hTable.flushCommits()
>>   hTable.close()
>> })
>>
>>
>

-- 
Best regards,

   - Andy

Problems worthy of attack prove their worth by hitting back. - Piet Hein
(via Tom White)


Re: HBase's checkAndPut, Timestamp in Phoenix-Spark API

2015-07-18 Thread Josh Mahonin
Hi,

The phoenix-spark integration is a thin wrapper around the
phoenix-mapreduce integration, which under the hood just uses Phoenix's
'UPSERT' functionality for saving. As far as I know, there's no provisions
for checkAndPut functionality there, so if you require it, I suggest
sticking to the HBase API for now.

Re: timestamp, the phoenix-spark API only supports using the native Phoenix
time formats, i.e. DATE, TIMESTAMP.

Good luck,

Josh

On Fri, Jul 17, 2015 at 11:40 PM, kaye ann  wrote:

> I am using Spark 1.3, HBase 1.1 and Phoenix 4.4. I have this in my code:
>
> val rdd = processedRdd.map(r => Row.fromSeq(r))
> val dataframe = sqlContext.createDataFrame(rdd, schema)
> dataframe.save("org.apache.phoenix.spark", SaveMode.Overwrite,
> Map("table" -> HTABLE, "zkUrl" -> zkQuorum))
>
> This code works, but...
> 1. How do I implement HBase's checkAndPut using Phoenix-Spark API?
> CREATED_DATE is always set to DateTime.now() in the dataframe.
> I don't want the field to be updated if the row already exists in HBase,
> yet there's an update in other fields.
> I can achieve it using HBase's checkAndPut: Put all the fields and use
> checkAndPut on created_date field.
> 2. How do I add an HBase Timestamp using Phoenix-Spark similiar to HBase
> API:
> Put(rowkey, timestamp.getMillis)
> -
> This is my code using HBase API that I am trying to convert to
> Phoenix-Spark since I think Phoenix-Spark is more optimized:
>
> rdd.foreachPartition(p => {
>   val conf = HBaseConfiguration.create()
>   val hTable = new HTable(conf, HTABLE)
>   hTable.setAutoFlushTo(false)
>
>   p.foreach(r => {
> val hTimestamp = ...
> val rowkey = ...
>
> val hRow = new Put(rowkey, hTimestamp.getMillis)
> r.filter(...).foreach(tuple =>
>   hRow.add(toBytes(tuple._1), toBytes(tuple._2), toBytes(tuple._3))
> )
> hTable.put(hRow)
>
> val CREATED_DATE_PUT = new Put(rowkey, hTimestamp.getMillis)
>   .add(toBytes(CF), toBytes(CREATED_DATE), toBytes(now))
> hTable.checkAndPut(rowkey, toBytes(CF), toBytes(CREATED_DATE), null, 
> CREATED_DATE_PUT)
>
>   })
>   hTable.flushCommits()
>   hTable.close()
> })
>
>