PHOENIX-4966 Implement unhandledFilters in PhoenixRelation so that spark only evaluates filters when required
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a694638f Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a694638f Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a694638f Branch: refs/heads/4.x-cdh5.15 Commit: a694638fa8b7a4c7bd1a0b3b2b8874830f7760e8 Parents: fb1e8f7 Author: Thomas D'Silva <tdsi...@apache.org> Authored: Thu Oct 11 23:46:48 2018 +0100 Committer: Pedro Boado <pbo...@apache.org> Committed: Wed Oct 17 22:50:43 2018 +0100 ---------------------------------------------------------------------- .../org/apache/phoenix/spark/PhoenixSparkIT.scala | 14 +++++++------- .../org/apache/phoenix/spark/PhoenixRelation.scala | 16 ++++++++++++---- 2 files changed, 19 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/a694638f/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala index b8e44fe..4e11acc 100644 --- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala +++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala @@ -285,13 +285,13 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { // Make sure we got the right value back assert(res.first().getLong(0) == 1L) - /* - NOTE: There doesn't appear to be any way of verifying from the Spark query planner that - filtering is being pushed down and done server-side. However, since PhoenixRelation - implements PrunedFilteredScan, debugging has shown that both the SELECT columns and WHERE - predicates are being passed along to us, which we then forward it to Phoenix. - TODO: investigate further to find a way to verify server-side pushdown - */ + val plan = res.queryExecution.sparkPlan + // filters should be pushed into phoenix relation + assert(plan.toString.contains("PushedFilters: [IsNotNull(COL1), IsNotNull(ID), " + + "EqualTo(COL1,test_row_1), EqualTo(ID,1)]")) + // spark should run the filters on the rows returned by Phoenix + assert(!plan.toString.contains("Filter (((isnotnull(COL1#8) && isnotnull(ID#7L)) " + + "&& (COL1#8 = test_row_1)) && (ID#7L = 1))")) } test("Can persist a dataframe using 'DataFrame.saveToPhoenix'") { http://git-wip-us.apache.org/repos/asf/phoenix/blob/a694638f/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala index d2eac8c..38bf29a 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala @@ -36,11 +36,12 @@ case class PhoenixRelation(tableName: String, zkUrl: String, dateAsTimestamp: Bo but this prevents having to load the whole table into Spark first. */ override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + val(pushedFilters, unhandledFilters) = buildFilter(filters) new PhoenixRDD( sqlContext.sparkContext, tableName, requiredColumns, - Some(buildFilter(filters)), + Some(pushedFilters), Some(zkUrl), new Configuration(), dateAsTimestamp @@ -62,12 +63,13 @@ case class PhoenixRelation(tableName: String, zkUrl: String, dateAsTimestamp: Bo // Attempt to create Phoenix-accepted WHERE clauses from Spark filters, // mostly inspired from Spark SQL JDBCRDD and the couchbase-spark-connector - private def buildFilter(filters: Array[Filter]): String = { + private def buildFilter(filters: Array[Filter]): (String, Array[Filter]) = { if (filters.isEmpty) { - return "" + return ("" , Array[Filter]()) } val filter = new StringBuilder("") + val unsupportedFilters = Array[Filter](); var i = 0 filters.foreach(f => { @@ -92,12 +94,18 @@ case class PhoenixRelation(tableName: String, zkUrl: String, dateAsTimestamp: Bo case StringStartsWith(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue(value + "%")}") case StringEndsWith(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue("%" + value)}") case StringContains(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue("%" + value + "%")}") + case _ => unsupportedFilters :+ f } i = i + 1 }) - filter.toString() + (filter.toString(), unsupportedFilters) + } + + override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { + val(pushedFilters, unhandledFilters) = buildFilter(filters) + unhandledFilters } // Helper function to escape column key to work with SQL queries