This is an automated email from the ASF dual-hosted git repository.

reidchan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase-connectors.git


The following commit(s) were added to refs/heads/master by this push:
     new f8ee8f6  HBASE-27488 [hbase-connectors] Duplicate result when 
searching HBase by Spark (#106)
f8ee8f6 is described below

commit f8ee8f6b78432d8fa0884105ec04fdd58c3e37b3
Author: ILuffZhe <37180946+iluff...@users.noreply.github.com>
AuthorDate: Wed Aug 2 19:54:32 2023 +0800

    HBASE-27488 [hbase-connectors] Duplicate result when searching HBase by 
Spark (#106)
    
    Signed-off-by: Reid Chan <reidc...@apache.org>
---
 .../hbase/spark/datasources/HBaseTableScanRDD.scala    | 18 +++++++++++-------
 .../apache/hadoop/hbase/spark/DefaultSourceSuite.scala | 16 ++++++++++++++++
 2 files changed, 27 insertions(+), 7 deletions(-)

diff --git 
a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
 
b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
index fe325f7..c334076 100644
--- 
a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
+++ 
b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
@@ -114,16 +114,20 @@ class HBaseTableScanRDD(relation: HBaseRelation,
       hbaseContext: HBaseContext): Iterator[Result] = {
     g.grouped(relation.bulkGetSize).flatMap{ x =>
       val gets = new ArrayList[Get](x.size)
+      val rowkeySet = new mutable.HashSet[String]()
       x.foreach{ y =>
-        val g = new Get(y)
-        handleTimeSemantics(g)
-        columns.foreach { d =>
-          if (!d.isRowKey) {
-            g.addColumn(d.cfBytes, d.colBytes)
+        if (!rowkeySet.contains(y.mkString("Array(", ", ", ")"))) {
+          val g = new Get(y)
+          handleTimeSemantics(g)
+          columns.foreach { d =>
+            if (!d.isRowKey) {
+              g.addColumn(d.cfBytes, d.colBytes)
+            }
           }
+          filter.foreach(g.setFilter(_))
+          gets.add(g)
+          rowkeySet.add(y.mkString("Array(", ", ", ")"))
         }
-        filter.foreach(g.setFilter(_))
-        gets.add(g)
       }
       hbaseContext.applyCreds()
       val tmp = tbr.get(gets)
diff --git 
a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
 
b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
index 366c9ba..47145d3 100644
--- 
a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
+++ 
b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
@@ -297,6 +297,22 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
     assert(executionRules.rowKeyFilter.ranges.size == 0)
   }
 
+  /**
+   * A example of query three fields and also only using rowkey points for the 
filter,
+   * some rowkey points are duplicate.
+   */
+  test("Test rowKey point only rowKey query, which contains duplicate rowkey") 
{
+    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM 
hbaseTable1 " +
+      "WHERE " +
+      "(KEY_FIELD = 'get1' or KEY_FIELD = 'get2' or KEY_FIELD = 
'get1')").take(10)
+    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+    assert(results.length == 2)
+    assert(executionRules.dynamicLogicExpression.toExpressionString.
+      equals("( KEY_FIELD == 0 OR KEY_FIELD == 1 )"))
+    assert(executionRules.rowKeyFilter.points.size == 2)
+    assert(executionRules.rowKeyFilter.ranges.size == 0)
+  }
+
   /**
    * A example of query three fields and also only using cell points for the 
filter
    */

Reply via email to