PHOENIX-3792 Provide way to skip normalization of column names in phoenix-spark integration
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/fa5281eb Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/fa5281eb Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/fa5281eb Branch: refs/heads/4.x-HBase-0.98 Commit: fa5281ebd119d24c9f0a3d274376a774c5334a37 Parents: 9e7a997 Author: Ankit Singhal <ankitsingha...@gmail.com> Authored: Fri Apr 21 11:55:12 2017 +0530 Committer: Ankit Singhal <ankitsingha...@gmail.com> Committed: Fri Apr 21 11:55:12 2017 +0530 ---------------------------------------------------------------------- phoenix-spark/src/it/resources/globalSetup.sql | 1 + .../apache/phoenix/spark/PhoenixSparkIT.scala | 27 ++++++++++++++++++-- .../phoenix/spark/DataFrameFunctions.scala | 19 +++++++++++--- .../apache/phoenix/spark/DefaultSource.scala | 2 +- 4 files changed, 42 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa5281eb/phoenix-spark/src/it/resources/globalSetup.sql ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/it/resources/globalSetup.sql b/phoenix-spark/src/it/resources/globalSetup.sql index dc24da7..7ac0039 100644 --- a/phoenix-spark/src/it/resources/globalSetup.sql +++ b/phoenix-spark/src/it/resources/globalSetup.sql @@ -17,6 +17,7 @@ CREATE TABLE table1 (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR) CREATE TABLE table1_copy (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR) CREATE TABLE table2 (id BIGINT NOT NULL PRIMARY KEY, table1_id BIGINT, "t2col1" VARCHAR) +CREATE TABLE table3 (id BIGINT NOT NULL PRIMARY KEY, table3_id BIGINT, "t2col1" VARCHAR) UPSERT INTO table1 (id, col1) VALUES (1, 'test_row_1') UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (1, 1, 'test_child_1') UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (2, 1, 'test_child_2') http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa5281eb/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala index bb8c302..528b33a 100644 --- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala +++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala @@ -20,15 +20,38 @@ import org.apache.phoenix.util.{ColumnInfo, SchemaUtil} import org.apache.spark.sql.types._ import org.apache.spark.sql.{Row, SQLContext, SaveMode} import org.joda.time.DateTime - +import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable.ListBuffer - +import org.apache.hadoop.conf.Configuration /** * Note: If running directly from an IDE, these are the recommended VM parameters: * -Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m */ class PhoenixSparkIT extends AbstractPhoenixSparkIT { + test("Can persist data with case senstive columns (like in avro schema) using 'DataFrame.saveToPhoenix'") { + val sqlContext = new SQLContext(sc) + val df = sqlContext.createDataFrame( + Seq( + (1, 1, "test_child_1"), + (2, 1, "test_child_2"))).toDF("ID", "TABLE3_ID", "t2col1") + df.saveToPhoenix("TABLE3", zkUrl = Some(quorumAddress),skipNormalizingIdentifier=true) + + // Verify results + val stmt = conn.createStatement() + val rs = stmt.executeQuery("SELECT * FROM TABLE3") + + val checkResults = List((1, 1, "test_child_1"), (2, 1, "test_child_2")) + val results = ListBuffer[(Long, Long, String)]() + while (rs.next()) { + results.append((rs.getLong(1), rs.getLong(2), rs.getString(3))) + } + stmt.close() + + results.toList shouldEqual checkResults + + } + test("Can convert Phoenix schema") { val phoenixSchema = List( new ColumnInfo("varcharColumn", PVarchar.INSTANCE.getSqlType) http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa5281eb/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala index ddf4fab..92f4c58 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala @@ -24,13 +24,16 @@ import scala.collection.JavaConversions._ class DataFrameFunctions(data: DataFrame) extends Serializable { - + def saveToPhoenix(parameters: Map[String, String]): Unit = { + saveToPhoenix(parameters("table"), zkUrl = parameters.get("zkUrl"), tenantId = parameters.get("TenantId"), + skipNormalizingIdentifier=parameters.contains("skipNormalizingIdentifier")) + } def saveToPhoenix(tableName: String, conf: Configuration = new Configuration, - zkUrl: Option[String] = None, tenantId: Option[String] = None): Unit = { - + zkUrl: Option[String] = None, tenantId: Option[String] = None, skipNormalizingIdentifier: Boolean = false): Unit = { // Retrieve the schema field names and normalize to Phoenix, need to do this outside of mapPartitions - val fieldArray = data.schema.fieldNames.map(x => SchemaUtil.normalizeIdentifier(x)) + val fieldArray = getFieldArray(skipNormalizingIdentifier, data) + // Create a configuration object to use for saving @transient val outConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrl, tenantId, Some(conf)) @@ -61,4 +64,12 @@ class DataFrameFunctions(data: DataFrame) extends Serializable { outConfig ) } + + def getFieldArray(skipNormalizingIdentifier: Boolean = false, data: DataFrame) = { + if (skipNormalizingIdentifier) { + data.schema.fieldNames.map(x => x) + } else { + data.schema.fieldNames.map(x => SchemaUtil.normalizeIdentifier(x)) + } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa5281eb/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala index 743d196..e000b74 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala @@ -44,7 +44,7 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider { verifyParameters(parameters) // Save the DataFrame to Phoenix - data.saveToPhoenix(parameters("table"), zkUrl = parameters.get("zkUrl"), tenantId = parameters.get("TenantId")) + data.saveToPhoenix(parameters) // Return a relation of the saved data createRelation(sqlContext, parameters)