PHOENIX-3792 Provide way to skip normalization of column names in phoenix-spark 
integration


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/fa5281eb
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/fa5281eb
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/fa5281eb

Branch: refs/heads/4.x-HBase-0.98
Commit: fa5281ebd119d24c9f0a3d274376a774c5334a37
Parents: 9e7a997
Author: Ankit Singhal <ankitsingha...@gmail.com>
Authored: Fri Apr 21 11:55:12 2017 +0530
Committer: Ankit Singhal <ankitsingha...@gmail.com>
Committed: Fri Apr 21 11:55:12 2017 +0530

----------------------------------------------------------------------
 phoenix-spark/src/it/resources/globalSetup.sql  |  1 +
 .../apache/phoenix/spark/PhoenixSparkIT.scala   | 27 ++++++++++++++++++--
 .../phoenix/spark/DataFrameFunctions.scala      | 19 +++++++++++---
 .../apache/phoenix/spark/DefaultSource.scala    |  2 +-
 4 files changed, 42 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa5281eb/phoenix-spark/src/it/resources/globalSetup.sql
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/resources/globalSetup.sql 
b/phoenix-spark/src/it/resources/globalSetup.sql
index dc24da7..7ac0039 100644
--- a/phoenix-spark/src/it/resources/globalSetup.sql
+++ b/phoenix-spark/src/it/resources/globalSetup.sql
@@ -17,6 +17,7 @@
 CREATE TABLE table1 (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR)
 CREATE TABLE table1_copy (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR)
 CREATE TABLE table2 (id BIGINT NOT NULL PRIMARY KEY, table1_id BIGINT, 
"t2col1" VARCHAR)
+CREATE TABLE table3 (id BIGINT NOT NULL PRIMARY KEY, table3_id BIGINT, 
"t2col1" VARCHAR)
 UPSERT INTO table1 (id, col1) VALUES (1, 'test_row_1')
 UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (1, 1, 'test_child_1')
 UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (2, 1, 'test_child_2')

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa5281eb/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
----------------------------------------------------------------------
diff --git 
a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala 
b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index bb8c302..528b33a 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -20,15 +20,38 @@ import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{Row, SQLContext, SaveMode}
 import org.joda.time.DateTime
-
+import org.apache.spark.{SparkConf, SparkContext}
 import scala.collection.mutable.ListBuffer
-
+import org.apache.hadoop.conf.Configuration
 /**
   * Note: If running directly from an IDE, these are the recommended VM 
parameters:
   * -Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
   */
 class PhoenixSparkIT extends AbstractPhoenixSparkIT {
 
+  test("Can persist data with case senstive columns (like in avro schema) 
using 'DataFrame.saveToPhoenix'") {
+    val sqlContext = new SQLContext(sc)
+    val df = sqlContext.createDataFrame(
+      Seq(
+        (1, 1, "test_child_1"),
+        (2, 1, "test_child_2"))).toDF("ID", "TABLE3_ID", "t2col1")
+    df.saveToPhoenix("TABLE3", zkUrl = 
Some(quorumAddress),skipNormalizingIdentifier=true)
+
+    // Verify results
+    val stmt = conn.createStatement()
+    val rs = stmt.executeQuery("SELECT * FROM TABLE3")
+
+    val checkResults = List((1, 1, "test_child_1"), (2, 1, "test_child_2"))
+    val results = ListBuffer[(Long, Long, String)]()
+    while (rs.next()) {
+      results.append((rs.getLong(1), rs.getLong(2), rs.getString(3)))
+    }
+    stmt.close()
+
+    results.toList shouldEqual checkResults
+
+  }
+  
   test("Can convert Phoenix schema") {
     val phoenixSchema = List(
       new ColumnInfo("varcharColumn", PVarchar.INSTANCE.getSqlType)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa5281eb/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
----------------------------------------------------------------------
diff --git 
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
 
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
index ddf4fab..92f4c58 100644
--- 
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
+++ 
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
@@ -24,13 +24,16 @@ import scala.collection.JavaConversions._
 
 
 class DataFrameFunctions(data: DataFrame) extends Serializable {
-
+  def saveToPhoenix(parameters: Map[String, String]): Unit = {
+               saveToPhoenix(parameters("table"), zkUrl = 
parameters.get("zkUrl"), tenantId = parameters.get("TenantId"), 
+               
skipNormalizingIdentifier=parameters.contains("skipNormalizingIdentifier"))
+   }
   def saveToPhoenix(tableName: String, conf: Configuration = new Configuration,
-                    zkUrl: Option[String] = None, tenantId: Option[String] = 
None): Unit = {
-
+                    zkUrl: Option[String] = None, tenantId: Option[String] = 
None, skipNormalizingIdentifier: Boolean = false): Unit = {
 
     // Retrieve the schema field names and normalize to Phoenix, need to do 
this outside of mapPartitions
-    val fieldArray = data.schema.fieldNames.map(x => 
SchemaUtil.normalizeIdentifier(x))
+    val fieldArray = getFieldArray(skipNormalizingIdentifier, data)
+    
 
     // Create a configuration object to use for saving
     @transient val outConfig = 
ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrl, 
tenantId, Some(conf))
@@ -61,4 +64,12 @@ class DataFrameFunctions(data: DataFrame) extends 
Serializable {
       outConfig
     )
   }
+
+  def getFieldArray(skipNormalizingIdentifier: Boolean = false, data: 
DataFrame) = {
+    if (skipNormalizingIdentifier) {
+      data.schema.fieldNames.map(x => x)
+    } else {
+      data.schema.fieldNames.map(x => SchemaUtil.normalizeIdentifier(x))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa5281eb/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git 
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala 
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
index 743d196..e000b74 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
@@ -44,7 +44,7 @@ class DefaultSource extends RelationProvider with 
CreatableRelationProvider {
     verifyParameters(parameters)
 
     // Save the DataFrame to Phoenix
-    data.saveToPhoenix(parameters("table"), zkUrl = parameters.get("zkUrl"), 
tenantId = parameters.get("TenantId"))
+    data.saveToPhoenix(parameters)
 
     // Return a relation of the saved data
     createRelation(sqlContext, parameters)

Reply via email to