PHOENIX-2236 PHOENIX-2290 PHOENIX-2547 Various phoenix-spark fixes (Kalyan Hadoop)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2afb16dc Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2afb16dc Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2afb16dc Branch: refs/heads/calcite Commit: 2afb16dc2032f2be9de220946e97f87336218e80 Parents: ba82b1c Author: Josh Mahonin <jmaho...@interset.com> Authored: Mon Aug 15 11:55:56 2016 -0400 Committer: Josh Mahonin <jmaho...@interset.com> Committed: Mon Aug 15 11:58:41 2016 -0400 ---------------------------------------------------------------------- phoenix-spark/src/it/resources/setup.sql | 6 +++ .../apache/phoenix/spark/PhoenixSparkIT.scala | 49 ++++++++++++++++++++ .../apache/phoenix/spark/PhoenixRelation.scala | 26 ++++++----- 3 files changed, 70 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/2afb16dc/phoenix-spark/src/it/resources/setup.sql ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/it/resources/setup.sql b/phoenix-spark/src/it/resources/setup.sql index aa2cee1..e56924f 100644 --- a/phoenix-spark/src/it/resources/setup.sql +++ b/phoenix-spark/src/it/resources/setup.sql @@ -48,3 +48,9 @@ CREATE TABLE TEST_SMALL_TINY (ID BIGINT NOT NULL PRIMARY KEY, COL1 SMALLINT, COL UPSERT INTO TEST_SMALL_TINY VALUES (1, 32767, 127) CREATE TABLE DATE_TEST(ID BIGINT NOT NULL PRIMARY KEY, COL1 DATE) UPSERT INTO DATE_TEST VALUES(1, CURRENT_DATE()) +CREATE TABLE "space" ("key" VARCHAR PRIMARY KEY, "first name" VARCHAR) +UPSERT INTO "space" VALUES ('key1', 'xyz') +CREATE TABLE "small" ("key" VARCHAR PRIMARY KEY, "first name" VARCHAR, "salary" INTEGER ) +UPSERT INTO "small" VALUES ('key1', 'foo', 10000) +UPSERT INTO "small" VALUES ('key2', 'bar', 20000) +UPSERT INTO "small" VALUES ('key3', 'xyz', 30000) http://git-wip-us.apache.org/repos/asf/phoenix/blob/2afb16dc/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala index c216406..7d05f07 100644 --- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala +++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala @@ -621,6 +621,55 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll { assert(Math.abs(epoch - dt) < 86400000) } + test("Filter operation doesn't work for column names containing a white space (PHOENIX-2547)") { + val sqlContext = new SQLContext(sc) + val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> SchemaUtil.getEscapedArgument("space"), + "zkUrl" -> quorumAddress)) + val res = df.filter(df.col("first name").equalTo("xyz")) + // Make sure we got the right value back + assert(res.collectAsList().size() == 1L) + } + + test("Spark Phoenix cannot recognize Phoenix view fields (PHOENIX-2290)") { + val sqlContext = new SQLContext(sc) + val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> SchemaUtil.getEscapedArgument("small"), + "zkUrl" -> quorumAddress)) + df.registerTempTable("temp") + + // limitation: filter / where expressions are not allowed with "double quotes", instead of that pass it as column expressions + // reason: if the expression contains "double quotes" then spark sql parser, ignoring evaluating .. giving to next level to handle + + val res1 = sqlContext.sql("select * from temp where salary = '10000' ") + assert(res1.collectAsList().size() == 1L) + + val res2 = sqlContext.sql("select * from temp where \"salary\" = '10000' ") + assert(res2.collectAsList().size() == 0L) + + val res3 = sqlContext.sql("select * from temp where salary > '10000' ") + assert(res3.collectAsList().size() == 2L) + } + + test("Queries with small case column-names return empty result-set when working with Spark Datasource Plugin (PHOENIX-2336)") { + val sqlContext = new SQLContext(sc) + val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> SchemaUtil.getEscapedArgument("small"), + "zkUrl" -> quorumAddress)) + + // limitation: filter / where expressions are not allowed with "double quotes", instead of that pass it as column expressions + // reason: if the expression contains "double quotes" then spark sql parser, ignoring evaluating .. giving to next level to handle + + val res1 = df.filter(df.col("first name").equalTo("foo")) + assert(res1.collectAsList().size() == 1L) + + val res2 = df.filter("\"first name\" = 'foo'") + assert(res2.collectAsList().size() == 0L) + + val res3 = df.filter("salary = '10000'") + assert(res3.collectAsList().size() == 1L) + + val res4 = df.filter("salary > '10000'") + assert(res4.collectAsList().size() == 2L) + } + test("Can coerce Phoenix DATE columns to TIMESTAMP through DataFrame API") { val sqlContext = new SQLContext(sc) val df = sqlContext.read http://git-wip-us.apache.org/repos/asf/phoenix/blob/2afb16dc/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala index 8d7f9f7..d2eac8c 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.sources._ import org.apache.phoenix.util.StringUtil.escapeStringConstant +import org.apache.phoenix.util.SchemaUtil case class PhoenixRelation(tableName: String, zkUrl: String, dateAsTimestamp: Boolean = false)(@transient val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan { @@ -80,17 +81,17 @@ case class PhoenixRelation(tableName: String, zkUrl: String, dateAsTimestamp: Bo case And(leftFilter, rightFilter) => filter.append(buildFilter(Array(leftFilter, rightFilter))) case Or(leftFilter, rightFilter) => filter.append(buildFilter(Array(leftFilter)) + " OR " + buildFilter(Array(rightFilter))) case Not(aFilter) => filter.append(" NOT " + buildFilter(Array(aFilter))) - case EqualTo(attr, value) => filter.append(s" $attr = ${compileValue(value)}") - case GreaterThan(attr, value) => filter.append(s" $attr > ${compileValue(value)}") - case GreaterThanOrEqual(attr, value) => filter.append(s" $attr >= ${compileValue(value)}") - case LessThan(attr, value) => filter.append(s" $attr < ${compileValue(value)}") - case LessThanOrEqual(attr, value) => filter.append(s" $attr <= ${compileValue(value)}") - case IsNull(attr) => filter.append(s" $attr IS NULL") - case IsNotNull(attr) => filter.append(s" $attr IS NOT NULL") - case In(attr, values) => filter.append(s" $attr IN ${values.map(compileValue).mkString("(", ",", ")")}") - case StringStartsWith(attr, value) => filter.append(s" $attr LIKE ${compileValue(value + "%")}") - case StringEndsWith(attr, value) => filter.append(s" $attr LIKE ${compileValue("%" + value)}") - case StringContains(attr, value) => filter.append(s" $attr LIKE ${compileValue("%" + value + "%")}") + case EqualTo(attr, value) => filter.append(s" ${escapeKey(attr)} = ${compileValue(value)}") + case GreaterThan(attr, value) => filter.append(s" ${escapeKey(attr)} > ${compileValue(value)}") + case GreaterThanOrEqual(attr, value) => filter.append(s" ${escapeKey(attr)} >= ${compileValue(value)}") + case LessThan(attr, value) => filter.append(s" ${escapeKey(attr)} < ${compileValue(value)}") + case LessThanOrEqual(attr, value) => filter.append(s" ${escapeKey(attr)} <= ${compileValue(value)}") + case IsNull(attr) => filter.append(s" ${escapeKey(attr)} IS NULL") + case IsNotNull(attr) => filter.append(s" ${escapeKey(attr)} IS NOT NULL") + case In(attr, values) => filter.append(s" ${escapeKey(attr)} IN ${values.map(compileValue).mkString("(", ",", ")")}") + case StringStartsWith(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue(value + "%")}") + case StringEndsWith(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue("%" + value)}") + case StringContains(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue("%" + value + "%")}") } i = i + 1 @@ -99,6 +100,9 @@ case class PhoenixRelation(tableName: String, zkUrl: String, dateAsTimestamp: Bo filter.toString() } + // Helper function to escape column key to work with SQL queries + private def escapeKey(key: String): String = SchemaUtil.getEscapedArgument(key) + // Helper function to escape string values in SQL queries private def compileValue(value: Any): Any = value match { case stringValue: String => s"'${escapeStringConstant(stringValue)}'"