This is an automated email from the ASF dual-hosted git repository. reidchan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hbase-connectors.git
The following commit(s) were added to refs/heads/master by this push: new f8ee8f6 HBASE-27488 [hbase-connectors] Duplicate result when searching HBase by Spark (#106) f8ee8f6 is described below commit f8ee8f6b78432d8fa0884105ec04fdd58c3e37b3 Author: ILuffZhe <37180946+iluff...@users.noreply.github.com> AuthorDate: Wed Aug 2 19:54:32 2023 +0800 HBASE-27488 [hbase-connectors] Duplicate result when searching HBase by Spark (#106) Signed-off-by: Reid Chan <reidc...@apache.org> --- .../hbase/spark/datasources/HBaseTableScanRDD.scala | 18 +++++++++++------- .../apache/hadoop/hbase/spark/DefaultSourceSuite.scala | 16 ++++++++++++++++ 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala index fe325f7..c334076 100644 --- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala +++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala @@ -114,16 +114,20 @@ class HBaseTableScanRDD(relation: HBaseRelation, hbaseContext: HBaseContext): Iterator[Result] = { g.grouped(relation.bulkGetSize).flatMap{ x => val gets = new ArrayList[Get](x.size) + val rowkeySet = new mutable.HashSet[String]() x.foreach{ y => - val g = new Get(y) - handleTimeSemantics(g) - columns.foreach { d => - if (!d.isRowKey) { - g.addColumn(d.cfBytes, d.colBytes) + if (!rowkeySet.contains(y.mkString("Array(", ", ", ")"))) { + val g = new Get(y) + handleTimeSemantics(g) + columns.foreach { d => + if (!d.isRowKey) { + g.addColumn(d.cfBytes, d.colBytes) + } } + filter.foreach(g.setFilter(_)) + gets.add(g) + rowkeySet.add(y.mkString("Array(", ", ", ")")) } - filter.foreach(g.setFilter(_)) - gets.add(g) } hbaseContext.applyCreds() val tmp = tbr.get(gets) diff --git a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala index 366c9ba..47145d3 100644 --- a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala +++ b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala @@ -297,6 +297,22 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { assert(executionRules.rowKeyFilter.ranges.size == 0) } + /** + * A example of query three fields and also only using rowkey points for the filter, + * some rowkey points are duplicate. + */ + test("Test rowKey point only rowKey query, which contains duplicate rowkey") { + val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " + + "WHERE " + + "(KEY_FIELD = 'get1' or KEY_FIELD = 'get2' or KEY_FIELD = 'get1')").take(10) + val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll() + assert(results.length == 2) + assert(executionRules.dynamicLogicExpression.toExpressionString. + equals("( KEY_FIELD == 0 OR KEY_FIELD == 1 )")) + assert(executionRules.rowKeyFilter.points.size == 2) + assert(executionRules.rowKeyFilter.ranges.size == 0) + } + /** * A example of query three fields and also only using cell points for the filter */