Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 f3056ba67 -> 517cbb78f
PHOENIX-2036 PhoenixConfigurationUtil should provide a pre-normalize table name to PhoenixRuntime Update phoenix-spark to follow the same normalization requirement. Conflicts: phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/517cbb78 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/517cbb78 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/517cbb78 Branch: refs/heads/4.x-HBase-0.98 Commit: 517cbb78fb142b95657856dd07b763acea19ecfb Parents: f3056ba Author: Josh Mahonin <jmaho...@gmail.com> Authored: Mon Jul 6 19:39:31 2015 -0400 Committer: Josh Mahonin <jmaho...@apache.org> Committed: Mon Jul 6 20:08:29 2015 -0400 ---------------------------------------------------------------------- phoenix-spark/src/it/resources/setup.sql | 4 +- .../apache/phoenix/spark/PhoenixSparkIT.scala | 58 ++++++++++++-------- .../org/apache/phoenix/spark/PhoenixRDD.scala | 24 +++----- 3 files changed, 46 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/517cbb78/phoenix-spark/src/it/resources/setup.sql ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/it/resources/setup.sql b/phoenix-spark/src/it/resources/setup.sql index 40157a2..154a996 100644 --- a/phoenix-spark/src/it/resources/setup.sql +++ b/phoenix-spark/src/it/resources/setup.sql @@ -32,4 +32,6 @@ CREATE TABLE ARRAY_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[] UPSERT INTO ARRAY_TEST_TABLE (ID, VCARRAY) VALUES (1, ARRAY['String1', 'String2', 'String3']) CREATE TABLE DATE_PREDICATE_TEST_TABLE (ID BIGINT NOT NULL, TIMESERIES_KEY TIMESTAMP NOT NULL CONSTRAINT pk PRIMARY KEY (ID, TIMESERIES_KEY)) UPSERT INTO DATE_PREDICATE_TEST_TABLE (ID, TIMESERIES_KEY) VALUES (1, CAST(CURRENT_TIME() AS TIMESTAMP)) -CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER, col3 DATE) \ No newline at end of file +CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER, col3 DATE) +CREATE TABLE CUSTOM_ENTITY."z02"(id BIGINT NOT NULL PRIMARY KEY) +UPSERT INTO CUSTOM_ENTITY."z02" (id) VALUES(1) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/517cbb78/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala index fb7d869..2889464 100644 --- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala +++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala @@ -20,9 +20,9 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, HBaseTestingUtility} import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT import org.apache.phoenix.query.BaseTest -import org.apache.phoenix.schema.ColumnNotFoundException +import org.apache.phoenix.schema.{TableNotFoundException, ColumnNotFoundException} import org.apache.phoenix.schema.types.PVarchar -import org.apache.phoenix.util.ColumnInfo +import org.apache.phoenix.util.{SchemaUtil, ColumnInfo} import org.apache.spark.sql.{SaveMode, execution, SQLContext} import org.apache.spark.sql.types.{LongType, DataType, StringType, StructField} import org.apache.spark.{SparkConf, SparkContext} @@ -96,23 +96,6 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll { PhoenixSparkITHelper.doTeardown } - def buildSql(table: String, columns: Seq[String], predicate: Option[String]): String = { - val query = "SELECT %s FROM \"%s\"" format(columns.map(f => "\"" + f + "\"").mkString(", "), table) - - query + (predicate match { - case Some(p: String) => " WHERE " + p - case _ => "" - }) - } - - test("Can create valid SQL") { - val rdd = new PhoenixRDD(sc, "MyTable", Array("Foo", "Bar"), - conf = hbaseConfiguration) - - rdd.buildSql("MyTable", Array("Foo", "Bar"), None) should - equal("SELECT \"Foo\", \"Bar\" FROM \"MyTable\"") - } - test("Can convert Phoenix schema") { val phoenixSchema = List( new ColumnInfo("varcharColumn", PVarchar.INSTANCE.getSqlType) @@ -153,7 +136,9 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll { test("Can create schema RDD and execute query on case sensitive table (no config)") { val sqlContext = new SQLContext(sc) - val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"), + val df1 = sqlContext.phoenixTableAsDataFrame( + SchemaUtil.getEscapedArgument("table3"), + Array("id", "col1"), zkUrl = Some(quorumAddress)) df1.registerTempTable("table3") @@ -190,10 +175,12 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll { } test("Using a predicate referring to a non-existent column should fail") { - intercept[RuntimeException] { + intercept[Exception] { val sqlContext = new SQLContext(sc) - val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"), + val df1 = sqlContext.phoenixTableAsDataFrame( + SchemaUtil.getEscapedArgument("table3"), + Array("id", "col1"), predicate = Some("foo = bar"), conf = hbaseConfiguration) @@ -209,7 +196,9 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll { test("Can create schema RDD with predicate that will never match") { val sqlContext = new SQLContext(sc) - val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"), + val df1 = sqlContext.phoenixTableAsDataFrame( + SchemaUtil.getEscapedArgument("table3"), + Array("id", "col1"), predicate = Some("\"id\" = -1"), conf = hbaseConfiguration) @@ -435,4 +424,27 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll { // Verify the arrays are equal sqlArray shouldEqual dataSet(0)._2 } + + test("Can read from table with schema and escaped table name") { + // Manually escape + val rdd1 = sc.phoenixTableAsRDD( + "CUSTOM_ENTITY.\"z02\"", + Seq("ID"), + conf = hbaseConfiguration) + + var count = rdd1.count() + + count shouldEqual 1L + + // Use SchemaUtil + val rdd2 = sc.phoenixTableAsRDD( + SchemaUtil.getEscapedFullTableName("CUSTOM_ENTITY.z02"), + Seq("ID"), + conf = hbaseConfiguration) + + count = rdd2.count() + + count shouldEqual 1L + + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/517cbb78/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala index 773026d..427fb24 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala @@ -63,30 +63,22 @@ class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String], } } - def buildSql(table: String, columns: Seq[String], predicate: Option[String]): String = { - val query = "SELECT %s FROM \"%s\"".format( - if (columns.isEmpty) "*" else columns.map(f => "\"" + f + "\"").mkString(", "), - table - ) - - query + (predicate match { - case Some(p: String) if p.length > 0 => " WHERE " + p - case _ => "" - }) - } - def getPhoenixConfiguration: Configuration = { // This is just simply not serializable, so don't try, but clone it because // PhoenixConfigurationUtil mutates it. val config = HBaseConfiguration.create(conf) - PhoenixConfigurationUtil.setInputQuery(config, buildSql(table, columns, predicate)) + PhoenixConfigurationUtil.setInputClass(config, classOf[PhoenixRecordWritable]) + PhoenixConfigurationUtil.setInputTableName(config, table) + if(!columns.isEmpty) { - PhoenixConfigurationUtil.setSelectColumnNames(config, columns.mkString(",")) + PhoenixConfigurationUtil.setSelectColumnNames(config, columns.toArray) + } + + if(predicate.isDefined) { + PhoenixConfigurationUtil.setInputTableConditions(config, predicate.get) } - PhoenixConfigurationUtil.setInputTableName(config, table) - PhoenixConfigurationUtil.setInputClass(config, classOf[PhoenixRecordWritable]) // Override the Zookeeper URL if present. Throw exception if no address given. zkUrl match {