HBASE-15572 Adding optional timestamp semantics to HBase-Spark (Weiqing Yang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/eec27ad7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/eec27ad7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/eec27ad7

Branch: refs/heads/HBASE-14850
Commit: eec27ad7ef7b5078f705301bd3042991d4d4b4d9
Parents: 8c7f044
Author: tedyu <yuzhih...@gmail.com>
Authored: Thu Mar 31 19:08:33 2016 -0700
Committer: tedyu <yuzhih...@gmail.com>
Committed: Thu Mar 31 19:08:33 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/spark/DefaultSource.scala      |   8 +-
 .../spark/datasources/HBaseSparkConf.scala      |   5 +
 .../spark/datasources/HBaseTableScanRDD.scala   |  26 +++++
 .../hadoop/hbase/spark/DefaultSourceSuite.scala | 105 ++++++++++++++++---
 4 files changed, 130 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/eec27ad7/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
index 7970816..c71ee4e 100644
--- 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
+++ 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
@@ -88,6 +88,12 @@ case class HBaseRelation (
     userSpecifiedSchema: Option[StructType]
   )(@transient val sqlContext: SQLContext)
   extends BaseRelation with PrunedFilteredScan  with InsertableRelation  with 
Logging {
+
+  val timestamp = parameters.get(HBaseSparkConf.TIMESTAMP).map(_.toLong)
+  val minTimeStamp = parameters.get(HBaseSparkConf.MIN_TIMESTAMP).map(_.toLong)
+  val maxTimeStamp = parameters.get(HBaseSparkConf.MAX_TIMESTAMP).map(_.toLong)
+  val maxVersions = parameters.get(HBaseSparkConf.MAX_VERSIONS).map(_.toInt)
+
   val catalog = HBaseTableCatalog(parameters)
   def tableName = catalog.name
   val configResources = 
parameters.getOrElse(HBaseSparkConf.HBASE_CONFIG_RESOURCES_LOCATIONS, "")
@@ -204,7 +210,7 @@ case class HBaseRelation (
         System.arraycopy(x, 0, rBytes, offset, x.length)
         offset += x.length
       }
-      val put = new Put(rBytes)
+      val put = timestamp.fold(new Put(rBytes))(new Put(rBytes, _))
 
       colsIdxedFields.foreach { case (x, y) =>
         val b = Utils.toBytes(row(x), y)

http://git-wip-us.apache.org/repos/asf/hbase/blob/eec27ad7/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
index ca44d42..2e4c0b3 100644
--- 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
+++ 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
@@ -36,4 +36,9 @@ object HBaseSparkConf{
   val USE_HBASE_CONTEXT = "hbase.use.hbase.context"
   val PUSH_DOWN_COLUMN_FILTER = "hbase.pushdown.column.filter"
   val defaultPushDownColumnFilter = true
+
+  val TIMESTAMP = "timestamp"
+  val MIN_TIMESTAMP = "minTimestamp"
+  val MAX_TIMESTAMP = "maxTimestamp"
+  val MAX_VERSIONS = "maxVersions"
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/eec27ad7/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
index 2e05651..886114a 100644
--- 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
+++ 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
@@ -105,6 +105,7 @@ class HBaseTableScanRDD(relation: HBaseRelation,
       val gets = new ArrayList[Get]()
       x.foreach{ y =>
         val g = new Get(y)
+        handleTimeSemantics(g)
         columns.foreach { d =>
           if (!d.isRowKey) {
             g.addColumn(d.cfBytes, d.colBytes)
@@ -157,6 +158,7 @@ class HBaseTableScanRDD(relation: HBaseRelation,
       case (Some(Bound(a, b)), None) => new Scan(a)
       case (None, None) => new Scan()
     }
+    handleTimeSemantics(scan)
 
     columns.foreach { d =>
       if (!d.isRowKey) {
@@ -226,6 +228,30 @@ class HBaseTableScanRDD(relation: HBaseRelation,
     } ++ gIt
     rIts
   }
+
+  private def handleTimeSemantics(query: Query): Unit = {
+    // Set timestamp related values if present
+    (query, relation.timestamp, relation.minTimeStamp, relation.maxTimeStamp)  
match {
+      case (q: Scan, Some(ts), None, None) => q.setTimeStamp(ts)
+      case (q: Get, Some(ts), None, None) => q.setTimeStamp(ts)
+
+      case (q:Scan, None, Some(minStamp), Some(maxStamp)) => 
q.setTimeRange(minStamp, maxStamp)
+      case (q:Get, None, Some(minStamp), Some(maxStamp)) => 
q.setTimeRange(minStamp, maxStamp)
+
+      case (q, None, None, None) =>
+
+      case _ => throw new IllegalArgumentException(s"Invalid combination of 
query/timestamp/time range provided. " +
+        s"timeStamp is: ${relation.timestamp.get}, minTimeStamp is: 
${relation.minTimeStamp.get}, " +
+        s"maxTimeStamp is: ${relation.maxTimeStamp.get}")
+    }
+    if (relation.maxVersions.isDefined) {
+      query match {
+        case q: Scan => q.setMaxVersions(relation.maxVersions.get)
+        case q: Get => q.setMaxVersions(relation.maxVersions.get)
+        case _ => throw new IllegalArgumentException("Invalid query provided 
with maxVersions")
+      }
+    }
+  }
 }
 
 case class SerializedFilter(b: Option[Array[Byte]])

http://git-wip-us.apache.org/repos/asf/hbase/blob/eec27ad7/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
 
b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
index 500967d..4ad10e2 100644
--- 
a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
+++ 
b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
@@ -19,32 +19,39 @@ package org.apache.hadoop.hbase.spark
 
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericData
-import org.apache.hadoop.hbase.client.{Put, ConnectionFactory}
+import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
 import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.{TableName, HBaseTestingUtility}
+import org.apache.hadoop.hbase.{HBaseTestingUtility, TableName}
 import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.{Logging, SparkConf, SparkContext}
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
 
 case class HBaseRecord(
-    col0: String,
-    col1: String,
-    col2: Double,
-    col3: Float,
-    col4: Int,
-    col5: Long)
+  col0: String,
+  col1: Boolean,
+  col2: Double,
+  col3: Float,
+  col4: Int,
+  col5: Long,
+  col6: Short,
+  col7: String,
+  col8: Byte)
 
 object HBaseRecord {
   def apply(i: Int, t: String): HBaseRecord = {
     val s = s"""row${"%03d".format(i)}"""
     HBaseRecord(s,
-      s,
+      i % 2 == 0,
       i.toDouble,
       i.toFloat,
       i,
-      i.toLong)
+      i.toLong,
+      i.toShort,
+      s"String$i: $t",
+      i.toByte)
   }
 }
 
@@ -815,11 +822,14 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
                     |"rowkey":"key",
                     |"columns":{
                     |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
-                    |"col1":{"cf":"cf1", "col":"col1", "type":"string"},
+                    |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
                     |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
                     |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
                     |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
-                    |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}}
+                    |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
+                    |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
+                    |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
+                    |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
                     |}
                     |}""".stripMargin
 
@@ -866,6 +876,75 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
     assert(s.count() == 6)
   }
 
+  test("Timestamp semantics") {
+    val sql = sqlContext
+    import sql.implicits._
+
+    // There's already some data in here from recently. Let's throw something 
in
+    // from 1993 which we can include/exclude and add some data with the 
implicit (now) timestamp.
+    // Then we should be able to cross-section it and only get points in 
between, get the most recent view
+    // and get an old view.
+    val oldMs = 754869600000L
+    val startMs = System.currentTimeMillis()
+    val oldData = (0 to 100).map { i =>
+      HBaseRecord(i, "old")
+    }
+    val newData = (200 to 255).map { i =>
+      HBaseRecord(i, "new")
+    }
+
+    sc.parallelize(oldData).toDF.write.options(
+      Map(HBaseTableCatalog.tableCatalog -> writeCatalog, 
HBaseTableCatalog.tableName -> "5",
+        HBaseSparkConf.TIMESTAMP -> oldMs.toString))
+      .format("org.apache.hadoop.hbase.spark")
+      .save()
+    sc.parallelize(newData).toDF.write.options(
+      Map(HBaseTableCatalog.tableCatalog -> writeCatalog, 
HBaseTableCatalog.tableName -> "5"))
+      .format("org.apache.hadoop.hbase.spark")
+      .save()
+
+    // Test specific timestamp -- Full scan, Timestamp
+    val individualTimestamp = sqlContext.read
+      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, 
HBaseSparkConf.TIMESTAMP -> oldMs.toString))
+      .format("org.apache.hadoop.hbase.spark")
+      .load();
+    assert(individualTimestamp.count() == 101)
+
+    // Test getting everything -- Full Scan, No range
+    val everything = sqlContext.read
+      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog))
+      .format("org.apache.hadoop.hbase.spark")
+      .load()
+    assert(everything.count() == 256)
+    // Test getting everything -- Pruned Scan, TimeRange
+    val element50 = everything.where(col("col0") === 
lit("row050")).select("col7").collect()(0)(0)
+    assert(element50 == "String50: extra")
+    val element200 = everything.where(col("col0") === 
lit("row200")).select("col7").collect()(0)(0)
+    assert(element200 == "String200: new")
+
+    // Test Getting old stuff -- Full Scan, TimeRange
+    val oldRange = sqlContext.read
+      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, 
HBaseSparkConf.MIN_TIMESTAMP -> "0",
+        HBaseSparkConf.MAX_TIMESTAMP -> (oldMs + 100).toString))
+      .format("org.apache.hadoop.hbase.spark")
+      .load()
+    assert(oldRange.count() == 101)
+    // Test Getting old stuff -- Pruned Scan, TimeRange
+    val oldElement50 = oldRange.where(col("col0") === 
lit("row050")).select("col7").collect()(0)(0)
+    assert(oldElement50 == "String50: old")
+
+    // Test Getting middle stuff -- Full Scan, TimeRange
+    val middleRange = sqlContext.read
+      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, 
HBaseSparkConf.MIN_TIMESTAMP -> "0",
+        HBaseSparkConf.MAX_TIMESTAMP -> (startMs + 100).toString))
+      .format("org.apache.hadoop.hbase.spark")
+      .load()
+    assert(middleRange.count() == 256)
+    // Test Getting middle stuff -- Pruned Scan, TimeRange
+    val middleElement200 = middleRange.where(col("col0") === 
lit("row200")).select("col7").collect()(0)(0)
+    assert(middleElement200 == "String200: extra")
+  }
+
   // catalog for insertion
   def avroWriteCatalog = s"""{
                              |"table":{"namespace":"default", 
"name":"avrotable"},

Reply via email to