PHOENIX 1968: Should support saving arrays
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/31a1ca6c Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/31a1ca6c Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/31a1ca6c Branch: refs/heads/calcite Commit: 31a1ca6caefb45430969fc7c0d28b50bb515c605 Parents: db90196 Author: ravimagham <ravimag...@apache.org> Authored: Thu Jun 11 11:50:21 2015 -0700 Committer: ravimagham <ravimag...@apache.org> Committed: Thu Jun 11 11:50:21 2015 -0700 ---------------------------------------------------------------------- .../apache/phoenix/spark/PhoenixSparkIT.scala | 21 ++++++++++++++++ .../phoenix/spark/PhoenixRecordWritable.scala | 25 ++++++++++++++++---- 2 files changed, 41 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a1ca6c/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala index 42e8676..5f256e6 100644 --- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala +++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala @@ -415,4 +415,25 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll { results.toList shouldEqual checkResults } + + test("Can save arrays back to phoenix") { + val dataSet = List((2L, Array("String1", "String2", "String3"))) + + sc + .parallelize(dataSet) + .saveToPhoenix( + "ARRAY_TEST_TABLE", + Seq("ID","VCARRAY"), + zkUrl = Some(quorumAddress) + ) + + // Load the results back + val stmt = conn.createStatement() + val rs = stmt.executeQuery("SELECT VCARRAY FROM ARRAY_TEST_TABLE WHERE ID = 2") + rs.next() + val sqlArray = rs.getArray(1).getArray().asInstanceOf[Array[String]] + + // Verify the arrays are equal + sqlArray shouldEqual dataSet(0)._2 + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a1ca6c/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala index 67e0bd2..3977657 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala @@ -16,11 +16,12 @@ package org.apache.phoenix.spark import java.sql.{PreparedStatement, ResultSet} import org.apache.hadoop.mapreduce.lib.db.DBWritable import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder -import org.apache.phoenix.schema.types.{PDate, PhoenixArray} +import org.apache.phoenix.schema.types.{PDataType, PDate, PhoenixArray} import org.joda.time.DateTime import scala.collection.{immutable, mutable} import scala.collection.JavaConversions._ + class PhoenixRecordWritable(var encodedColumns: String) extends DBWritable { val upsertValues = mutable.ArrayBuffer[Any]() val resultMap = mutable.Map[String, AnyRef]() @@ -44,13 +45,27 @@ class PhoenixRecordWritable(var encodedColumns: String) extends DBWritable { upsertValues.zip(columns).zipWithIndex.foreach { case ((v, c), i) => { if (v != null) { + // Both Java and Joda dates used to work in 4.2.3, but now they must be java.sql.Date + // Can override any other types here as needed val (finalObj, finalType) = v match { - case dt: DateTime => (new java.sql.Date(dt.getMillis), PDate.INSTANCE.getSqlType) - case d: java.util.Date => (new java.sql.Date(d.getTime), PDate.INSTANCE.getSqlType) - case _ => (v, c.getSqlType) + case dt: DateTime => (new java.sql.Date(dt.getMillis), PDate.INSTANCE) + case d: java.util.Date => (new java.sql.Date(d.getTime), PDate.INSTANCE) + case _ => (v, c.getPDataType) + } + + // Save as array or object + finalObj match { + case obj: Array[AnyRef] => { + // Create a java.sql.Array, need to lookup the base sql type name + val sqlArray = statement.getConnection.createArrayOf( + PDataType.arrayBaseType(finalType).getSqlTypeName, + obj + ) + statement.setArray(i + 1, sqlArray) + } + case _ => statement.setObject(i + 1, finalObj) } - statement.setObject(i + 1, finalObj, finalType) } else { statement.setNull(i + 1, c.getSqlType) }