This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git


The following commit(s) were added to refs/heads/master by this push:
     new 29422f1  PHOENIX-6559 spark connector access to SmallintArray / 
UnsignedSmallintArray columns
29422f1 is described below

commit 29422f18605d54daee047dd851fe770af01e3ded
Author: alferca <alfe...@ific.uv.es>
AuthorDate: Thu Sep 23 17:34:55 2021 +0200

    PHOENIX-6559 spark connector access to SmallintArray / 
UnsignedSmallintArray columns
---
 .../src/it/resources/globalSetup.sql               |  2 +
 .../org/apache/phoenix/spark/PhoenixSparkIT.scala  | 44 +++++++++++++++++++++-
 .../org/apache/phoenix/spark/SparkSchemaUtil.scala |  2 +-
 .../datasources/jdbc/PhoenixJdbcDialect.scala      |  3 +-
 4 files changed, 47 insertions(+), 4 deletions(-)

diff --git a/phoenix-spark-base/src/it/resources/globalSetup.sql 
b/phoenix-spark-base/src/it/resources/globalSetup.sql
index 2f6e9ed..8a3a4c2 100644
--- a/phoenix-spark-base/src/it/resources/globalSetup.sql
+++ b/phoenix-spark-base/src/it/resources/globalSetup.sql
@@ -37,6 +37,8 @@ CREATE TABLE ARRAY_ANYVAL_TEST_TABLE (ID BIGINT NOT NULL 
PRIMARY KEY, INTARRAY I
 UPSERT INTO ARRAY_ANYVAL_TEST_TABLE (ID, INTARRAY, BIGINTARRAY) VALUES (1, 
ARRAY[1, 2, 3], ARRAY[1, 2, 3])
 CREATE TABLE ARRAY_BYTE_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, BYTEARRAY 
TINYINT[])
 UPSERT INTO ARRAY_BYTE_TEST_TABLE (ID, BYTEARRAY) VALUES (1, ARRAY[1, 2, 3])
+CREATE TABLE ARRAY_SHORT_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, 
SHORTARRAY SMALLINT[])
+UPSERT INTO ARRAY_SHORT_TEST_TABLE (ID, SHORTARRAY) VALUES (1, ARRAY[1, 2, 3])
 CREATE TABLE VARBINARY_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, BIN 
BINARY(1), VARBIN VARBINARY, BINARRAY BINARY(1)[])
 CREATE TABLE DATE_PREDICATE_TEST_TABLE (ID BIGINT NOT NULL, TIMESERIES_KEY 
TIMESTAMP NOT NULL CONSTRAINT pk PRIMARY KEY (ID, TIMESERIES_KEY))
 UPSERT INTO DATE_PREDICATE_TEST_TABLE (ID, TIMESERIES_KEY) VALUES (1, 
CAST(CURRENT_TIME() AS TIMESTAMP))
diff --git 
a/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala 
b/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index 58910ce..e9a3274 100644
--- 
a/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ 
b/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -17,13 +17,13 @@ import java.sql.DriverManager
 import java.util.Date
 
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
-import org.apache.phoenix.schema.types.PVarchar
+import org.apache.phoenix.schema.types.{PVarchar, PSmallintArray, 
PUnsignedSmallintArray}
 import org.apache.phoenix.spark.datasource.v2.{PhoenixDataSource, 
PhoenixTestingDataSource}
 import 
org.apache.phoenix.spark.datasource.v2.reader.PhoenixTestingInputPartitionReader
 import 
org.apache.phoenix.spark.datasource.v2.writer.PhoenixTestingDataSourceWriter
 import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
 import org.apache.spark.SparkException
-import org.apache.spark.sql.types.{ArrayType, BinaryType, ByteType, DateType, 
IntegerType, LongType, StringType, StructField, StructType}
+import org.apache.spark.sql.types.{ArrayType, BinaryType, ByteType, DateType, 
IntegerType, LongType, StringType, StructField, StructType, ShortType}
 import org.apache.spark.sql.{Row, SaveMode}
 
 import scala.collection.mutable
@@ -99,6 +99,18 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
     catalystSchema shouldEqual expected
   }
 
+  test("Can convert arrays of Short type in Phoenix schema") {
+    val phoenixSchema = List(
+      new ColumnInfo("arrayshortColumn", PSmallintArray.INSTANCE.getSqlType)
+    )
+
+    val catalystSchema = 
SparkSchemaUtil.phoenixSchemaToCatalystSchema(phoenixSchema)
+
+    val expected = new StructType(List(StructField("arrayshortColumn", 
ArrayType(ShortType, true), nullable = true)).toArray)
+
+    catalystSchema shouldEqual expected
+  }
+
   test("Can create schema RDD and execute query") {
     val df1 = spark.sqlContext.read.format("phoenix")
       .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> 
quorumAddress)).load
@@ -660,6 +672,34 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
     byteArray shouldEqual dataSet(0).get(1)
   }
 
+  test("Can save arrays of Short type back to phoenix") {
+    val dataSet = List(Row(2L, Array(1.toShort, 2.toShort, 3.toShort)))
+
+    val schema = StructType(
+      Seq(StructField("ID", LongType, nullable = false),
+        StructField("SHORTARRAY", ArrayType(ShortType))))
+
+    val rowRDD = spark.sparkContext.parallelize(dataSet)
+
+    // Apply the schema to the RDD.
+    val df = spark.sqlContext.createDataFrame(rowRDD, schema)
+
+    df.write
+      .format("phoenix")
+      .options(Map("table" -> "ARRAY_SHORT_TEST_TABLE", 
PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    // Load the results back
+    val stmt = conn.createStatement()
+    val rs = stmt.executeQuery("SELECT SHORTARRAY FROM ARRAY_SHORT_TEST_TABLE 
WHERE ID = 2")
+    rs.next()
+    val shortArray = rs.getArray(1).getArray().asInstanceOf[Array[Short]]
+
+    // Verify the arrays are equal
+    shortArray shouldEqual dataSet(0).get(1)
+  }
+
   test("Can save binary types back to phoenix") {
     val dataSet = List(Row(2L, Array[Byte](1), Array[Byte](1, 2, 3), 
Array[Array[Byte]](Array[Byte](1), Array[Byte](2))))
 
diff --git 
a/phoenix-spark-base/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
 
b/phoenix-spark-base/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
index 363acf8..26461ac 100644
--- 
a/phoenix-spark-base/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
+++ 
b/phoenix-spark-base/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
@@ -79,7 +79,7 @@ object SparkSchemaUtil {
     case t if t.isInstanceOf[PVarcharArray] || t.isInstanceOf[PCharArray] => 
ArrayType(StringType, containsNull = true)
     case t if t.isInstanceOf[PVarbinaryArray] || t.isInstanceOf[PBinaryArray] 
=> ArrayType(BinaryType, containsNull = true)
     case t if t.isInstanceOf[PLongArray] || t.isInstanceOf[PUnsignedLongArray] 
=> ArrayType(LongType, containsNull = true)
-    case t if t.isInstanceOf[PSmallintArray] || 
t.isInstanceOf[PUnsignedSmallintArray] => ArrayType(IntegerType, containsNull = 
true)
+    case t if t.isInstanceOf[PSmallintArray] || 
t.isInstanceOf[PUnsignedSmallintArray] => ArrayType(ShortType, containsNull = 
true)
     case t if t.isInstanceOf[PTinyintArray] || 
t.isInstanceOf[PUnsignedTinyintArray] => ArrayType(ByteType, containsNull = 
true)
     case t if t.isInstanceOf[PFloatArray] || 
t.isInstanceOf[PUnsignedFloatArray] => ArrayType(FloatType, containsNull = true)
     case t if t.isInstanceOf[PDoubleArray] || 
t.isInstanceOf[PUnsignedDoubleArray] => ArrayType(DoubleType, containsNull = 
true)
diff --git 
a/phoenix-spark-base/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala
 
b/phoenix-spark-base/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala
index f89a451..2bc0b88 100644
--- 
a/phoenix-spark-base/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala
+++ 
b/phoenix-spark-base/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala
@@ -19,7 +19,7 @@
 package org.apache.spark.sql.execution.datasources.jdbc
 
 import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType}
-import org.apache.spark.sql.types.{BinaryType, ByteType, DataType, StringType}
+import org.apache.spark.sql.types.{BinaryType, ByteType, DataType, StringType, 
ShortType}
 
 private object PhoenixJdbcDialect  extends JdbcDialect {
 
@@ -32,6 +32,7 @@ private object PhoenixJdbcDialect  extends JdbcDialect {
     case StringType => Some(JdbcType("VARCHAR", java.sql.Types.VARCHAR))
     case BinaryType => Some(JdbcType("BINARY(" + dt.defaultSize + ")", 
java.sql.Types.BINARY))
     case ByteType => Some(JdbcType("TINYINT", java.sql.Types.TINYINT))
+    case ShortType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT))
     case _ => None
   }
 

Reply via email to