Revert "HBASE-15572 Adding optional timestamp semantics to HBase-Spark (Weiqing 
Yang)"

This reverts commit eec27ad7ef7b5078f705301bd3042991d4d4b4d9.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6905d272
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6905d272
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6905d272

Branch: refs/heads/HBASE-14850
Commit: 6905d272d34f0d674cfd73c6a8d579e3231b5a78
Parents: eec27ad
Author: Sean Busbey <bus...@apache.org>
Authored: Thu Mar 31 21:40:50 2016 -0500
Committer: Sean Busbey <bus...@apache.org>
Committed: Thu Mar 31 21:40:50 2016 -0500

----------------------------------------------------------------------
 .../hadoop/hbase/spark/DefaultSource.scala      |   8 +-
 .../spark/datasources/HBaseSparkConf.scala      |   5 -
 .../spark/datasources/HBaseTableScanRDD.scala   |  26 -----
 .../hadoop/hbase/spark/DefaultSourceSuite.scala | 105 +++----------------
 4 files changed, 14 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6905d272/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
index c71ee4e..7970816 100644
--- 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
+++ 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
@@ -88,12 +88,6 @@ case class HBaseRelation (
     userSpecifiedSchema: Option[StructType]
   )(@transient val sqlContext: SQLContext)
   extends BaseRelation with PrunedFilteredScan  with InsertableRelation  with 
Logging {
-
-  val timestamp = parameters.get(HBaseSparkConf.TIMESTAMP).map(_.toLong)
-  val minTimeStamp = parameters.get(HBaseSparkConf.MIN_TIMESTAMP).map(_.toLong)
-  val maxTimeStamp = parameters.get(HBaseSparkConf.MAX_TIMESTAMP).map(_.toLong)
-  val maxVersions = parameters.get(HBaseSparkConf.MAX_VERSIONS).map(_.toInt)
-
   val catalog = HBaseTableCatalog(parameters)
   def tableName = catalog.name
   val configResources = 
parameters.getOrElse(HBaseSparkConf.HBASE_CONFIG_RESOURCES_LOCATIONS, "")
@@ -210,7 +204,7 @@ case class HBaseRelation (
         System.arraycopy(x, 0, rBytes, offset, x.length)
         offset += x.length
       }
-      val put = timestamp.fold(new Put(rBytes))(new Put(rBytes, _))
+      val put = new Put(rBytes)
 
       colsIdxedFields.foreach { case (x, y) =>
         val b = Utils.toBytes(row(x), y)

http://git-wip-us.apache.org/repos/asf/hbase/blob/6905d272/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
index 2e4c0b3..ca44d42 100644
--- 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
+++ 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
@@ -36,9 +36,4 @@ object HBaseSparkConf{
   val USE_HBASE_CONTEXT = "hbase.use.hbase.context"
   val PUSH_DOWN_COLUMN_FILTER = "hbase.pushdown.column.filter"
   val defaultPushDownColumnFilter = true
-
-  val TIMESTAMP = "timestamp"
-  val MIN_TIMESTAMP = "minTimestamp"
-  val MAX_TIMESTAMP = "maxTimestamp"
-  val MAX_VERSIONS = "maxVersions"
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6905d272/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
index 886114a..2e05651 100644
--- 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
+++ 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
@@ -105,7 +105,6 @@ class HBaseTableScanRDD(relation: HBaseRelation,
       val gets = new ArrayList[Get]()
       x.foreach{ y =>
         val g = new Get(y)
-        handleTimeSemantics(g)
         columns.foreach { d =>
           if (!d.isRowKey) {
             g.addColumn(d.cfBytes, d.colBytes)
@@ -158,7 +157,6 @@ class HBaseTableScanRDD(relation: HBaseRelation,
       case (Some(Bound(a, b)), None) => new Scan(a)
       case (None, None) => new Scan()
     }
-    handleTimeSemantics(scan)
 
     columns.foreach { d =>
       if (!d.isRowKey) {
@@ -228,30 +226,6 @@ class HBaseTableScanRDD(relation: HBaseRelation,
     } ++ gIt
     rIts
   }
-
-  private def handleTimeSemantics(query: Query): Unit = {
-    // Set timestamp related values if present
-    (query, relation.timestamp, relation.minTimeStamp, relation.maxTimeStamp)  
match {
-      case (q: Scan, Some(ts), None, None) => q.setTimeStamp(ts)
-      case (q: Get, Some(ts), None, None) => q.setTimeStamp(ts)
-
-      case (q:Scan, None, Some(minStamp), Some(maxStamp)) => 
q.setTimeRange(minStamp, maxStamp)
-      case (q:Get, None, Some(minStamp), Some(maxStamp)) => 
q.setTimeRange(minStamp, maxStamp)
-
-      case (q, None, None, None) =>
-
-      case _ => throw new IllegalArgumentException(s"Invalid combination of 
query/timestamp/time range provided. " +
-        s"timeStamp is: ${relation.timestamp.get}, minTimeStamp is: 
${relation.minTimeStamp.get}, " +
-        s"maxTimeStamp is: ${relation.maxTimeStamp.get}")
-    }
-    if (relation.maxVersions.isDefined) {
-      query match {
-        case q: Scan => q.setMaxVersions(relation.maxVersions.get)
-        case q: Get => q.setMaxVersions(relation.maxVersions.get)
-        case _ => throw new IllegalArgumentException("Invalid query provided 
with maxVersions")
-      }
-    }
-  }
 }
 
 case class SerializedFilter(b: Option[Array[Byte]])

http://git-wip-us.apache.org/repos/asf/hbase/blob/6905d272/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
 
b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
index 4ad10e2..500967d 100644
--- 
a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
+++ 
b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
@@ -19,39 +19,32 @@ package org.apache.hadoop.hbase.spark
 
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericData
-import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
+import org.apache.hadoop.hbase.client.{Put, ConnectionFactory}
 import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.{HBaseTestingUtility, TableName}
+import org.apache.hadoop.hbase.{TableName, HBaseTestingUtility}
 import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog
-import org.apache.spark.sql.functions._
 import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.{Logging, SparkConf, SparkContext}
+import org.apache.spark.{SparkConf, SparkContext, Logging}
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
 
 case class HBaseRecord(
-  col0: String,
-  col1: Boolean,
-  col2: Double,
-  col3: Float,
-  col4: Int,
-  col5: Long,
-  col6: Short,
-  col7: String,
-  col8: Byte)
+    col0: String,
+    col1: String,
+    col2: Double,
+    col3: Float,
+    col4: Int,
+    col5: Long)
 
 object HBaseRecord {
   def apply(i: Int, t: String): HBaseRecord = {
     val s = s"""row${"%03d".format(i)}"""
     HBaseRecord(s,
-      i % 2 == 0,
+      s,
       i.toDouble,
       i.toFloat,
       i,
-      i.toLong,
-      i.toShort,
-      s"String$i: $t",
-      i.toByte)
+      i.toLong)
   }
 }
 
@@ -822,14 +815,11 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
                     |"rowkey":"key",
                     |"columns":{
                     |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
-                    |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
+                    |"col1":{"cf":"cf1", "col":"col1", "type":"string"},
                     |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
                     |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
                     |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
-                    |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
-                    |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
-                    |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
-                    |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
+                    |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}}
                     |}
                     |}""".stripMargin
 
@@ -876,75 +866,6 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
     assert(s.count() == 6)
   }
 
-  test("Timestamp semantics") {
-    val sql = sqlContext
-    import sql.implicits._
-
-    // There's already some data in here from recently. Let's throw something 
in
-    // from 1993 which we can include/exclude and add some data with the 
implicit (now) timestamp.
-    // Then we should be able to cross-section it and only get points in 
between, get the most recent view
-    // and get an old view.
-    val oldMs = 754869600000L
-    val startMs = System.currentTimeMillis()
-    val oldData = (0 to 100).map { i =>
-      HBaseRecord(i, "old")
-    }
-    val newData = (200 to 255).map { i =>
-      HBaseRecord(i, "new")
-    }
-
-    sc.parallelize(oldData).toDF.write.options(
-      Map(HBaseTableCatalog.tableCatalog -> writeCatalog, 
HBaseTableCatalog.tableName -> "5",
-        HBaseSparkConf.TIMESTAMP -> oldMs.toString))
-      .format("org.apache.hadoop.hbase.spark")
-      .save()
-    sc.parallelize(newData).toDF.write.options(
-      Map(HBaseTableCatalog.tableCatalog -> writeCatalog, 
HBaseTableCatalog.tableName -> "5"))
-      .format("org.apache.hadoop.hbase.spark")
-      .save()
-
-    // Test specific timestamp -- Full scan, Timestamp
-    val individualTimestamp = sqlContext.read
-      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, 
HBaseSparkConf.TIMESTAMP -> oldMs.toString))
-      .format("org.apache.hadoop.hbase.spark")
-      .load();
-    assert(individualTimestamp.count() == 101)
-
-    // Test getting everything -- Full Scan, No range
-    val everything = sqlContext.read
-      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog))
-      .format("org.apache.hadoop.hbase.spark")
-      .load()
-    assert(everything.count() == 256)
-    // Test getting everything -- Pruned Scan, TimeRange
-    val element50 = everything.where(col("col0") === 
lit("row050")).select("col7").collect()(0)(0)
-    assert(element50 == "String50: extra")
-    val element200 = everything.where(col("col0") === 
lit("row200")).select("col7").collect()(0)(0)
-    assert(element200 == "String200: new")
-
-    // Test Getting old stuff -- Full Scan, TimeRange
-    val oldRange = sqlContext.read
-      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, 
HBaseSparkConf.MIN_TIMESTAMP -> "0",
-        HBaseSparkConf.MAX_TIMESTAMP -> (oldMs + 100).toString))
-      .format("org.apache.hadoop.hbase.spark")
-      .load()
-    assert(oldRange.count() == 101)
-    // Test Getting old stuff -- Pruned Scan, TimeRange
-    val oldElement50 = oldRange.where(col("col0") === 
lit("row050")).select("col7").collect()(0)(0)
-    assert(oldElement50 == "String50: old")
-
-    // Test Getting middle stuff -- Full Scan, TimeRange
-    val middleRange = sqlContext.read
-      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, 
HBaseSparkConf.MIN_TIMESTAMP -> "0",
-        HBaseSparkConf.MAX_TIMESTAMP -> (startMs + 100).toString))
-      .format("org.apache.hadoop.hbase.spark")
-      .load()
-    assert(middleRange.count() == 256)
-    // Test Getting middle stuff -- Pruned Scan, TimeRange
-    val middleElement200 = middleRange.where(col("col0") === 
lit("row200")).select("col7").collect()(0)(0)
-    assert(middleElement200 == "String200: extra")
-  }
-
   // catalog for insertion
   def avroWriteCatalog = s"""{
                              |"table":{"namespace":"default", 
"name":"avrotable"},

Reply via email to