This is an automated email from the ASF dual-hosted git repository. stoty pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git
The following commit(s) were added to refs/heads/master by this push: new 29422f1 PHOENIX-6559 spark connector access to SmallintArray / UnsignedSmallintArray columns 29422f1 is described below commit 29422f18605d54daee047dd851fe770af01e3ded Author: alferca <alfe...@ific.uv.es> AuthorDate: Thu Sep 23 17:34:55 2021 +0200 PHOENIX-6559 spark connector access to SmallintArray / UnsignedSmallintArray columns --- .../src/it/resources/globalSetup.sql | 2 + .../org/apache/phoenix/spark/PhoenixSparkIT.scala | 44 +++++++++++++++++++++- .../org/apache/phoenix/spark/SparkSchemaUtil.scala | 2 +- .../datasources/jdbc/PhoenixJdbcDialect.scala | 3 +- 4 files changed, 47 insertions(+), 4 deletions(-) diff --git a/phoenix-spark-base/src/it/resources/globalSetup.sql b/phoenix-spark-base/src/it/resources/globalSetup.sql index 2f6e9ed..8a3a4c2 100644 --- a/phoenix-spark-base/src/it/resources/globalSetup.sql +++ b/phoenix-spark-base/src/it/resources/globalSetup.sql @@ -37,6 +37,8 @@ CREATE TABLE ARRAY_ANYVAL_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, INTARRAY I UPSERT INTO ARRAY_ANYVAL_TEST_TABLE (ID, INTARRAY, BIGINTARRAY) VALUES (1, ARRAY[1, 2, 3], ARRAY[1, 2, 3]) CREATE TABLE ARRAY_BYTE_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, BYTEARRAY TINYINT[]) UPSERT INTO ARRAY_BYTE_TEST_TABLE (ID, BYTEARRAY) VALUES (1, ARRAY[1, 2, 3]) +CREATE TABLE ARRAY_SHORT_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, SHORTARRAY SMALLINT[]) +UPSERT INTO ARRAY_SHORT_TEST_TABLE (ID, SHORTARRAY) VALUES (1, ARRAY[1, 2, 3]) CREATE TABLE VARBINARY_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, BIN BINARY(1), VARBIN VARBINARY, BINARRAY BINARY(1)[]) CREATE TABLE DATE_PREDICATE_TEST_TABLE (ID BIGINT NOT NULL, TIMESERIES_KEY TIMESTAMP NOT NULL CONSTRAINT pk PRIMARY KEY (ID, TIMESERIES_KEY)) UPSERT INTO DATE_PREDICATE_TEST_TABLE (ID, TIMESERIES_KEY) VALUES (1, CAST(CURRENT_TIME() AS TIMESTAMP)) diff --git a/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala index 58910ce..e9a3274 100644 --- a/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala +++ b/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala @@ -17,13 +17,13 @@ import java.sql.DriverManager import java.util.Date import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil -import org.apache.phoenix.schema.types.PVarchar +import org.apache.phoenix.schema.types.{PVarchar, PSmallintArray, PUnsignedSmallintArray} import org.apache.phoenix.spark.datasource.v2.{PhoenixDataSource, PhoenixTestingDataSource} import org.apache.phoenix.spark.datasource.v2.reader.PhoenixTestingInputPartitionReader import org.apache.phoenix.spark.datasource.v2.writer.PhoenixTestingDataSourceWriter import org.apache.phoenix.util.{ColumnInfo, SchemaUtil} import org.apache.spark.SparkException -import org.apache.spark.sql.types.{ArrayType, BinaryType, ByteType, DateType, IntegerType, LongType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{ArrayType, BinaryType, ByteType, DateType, IntegerType, LongType, StringType, StructField, StructType, ShortType} import org.apache.spark.sql.{Row, SaveMode} import scala.collection.mutable @@ -99,6 +99,18 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { catalystSchema shouldEqual expected } + test("Can convert arrays of Short type in Phoenix schema") { + val phoenixSchema = List( + new ColumnInfo("arrayshortColumn", PSmallintArray.INSTANCE.getSqlType) + ) + + val catalystSchema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(phoenixSchema) + + val expected = new StructType(List(StructField("arrayshortColumn", ArrayType(ShortType, true), nullable = true)).toArray) + + catalystSchema shouldEqual expected + } + test("Can create schema RDD and execute query") { val df1 = spark.sqlContext.read.format("phoenix") .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load @@ -660,6 +672,34 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { byteArray shouldEqual dataSet(0).get(1) } + test("Can save arrays of Short type back to phoenix") { + val dataSet = List(Row(2L, Array(1.toShort, 2.toShort, 3.toShort))) + + val schema = StructType( + Seq(StructField("ID", LongType, nullable = false), + StructField("SHORTARRAY", ArrayType(ShortType)))) + + val rowRDD = spark.sparkContext.parallelize(dataSet) + + // Apply the schema to the RDD. + val df = spark.sqlContext.createDataFrame(rowRDD, schema) + + df.write + .format("phoenix") + .options(Map("table" -> "ARRAY_SHORT_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)) + .mode(SaveMode.Overwrite) + .save() + + // Load the results back + val stmt = conn.createStatement() + val rs = stmt.executeQuery("SELECT SHORTARRAY FROM ARRAY_SHORT_TEST_TABLE WHERE ID = 2") + rs.next() + val shortArray = rs.getArray(1).getArray().asInstanceOf[Array[Short]] + + // Verify the arrays are equal + shortArray shouldEqual dataSet(0).get(1) + } + test("Can save binary types back to phoenix") { val dataSet = List(Row(2L, Array[Byte](1), Array[Byte](1, 2, 3), Array[Array[Byte]](Array[Byte](1), Array[Byte](2)))) diff --git a/phoenix-spark-base/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala b/phoenix-spark-base/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala index 363acf8..26461ac 100644 --- a/phoenix-spark-base/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala +++ b/phoenix-spark-base/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala @@ -79,7 +79,7 @@ object SparkSchemaUtil { case t if t.isInstanceOf[PVarcharArray] || t.isInstanceOf[PCharArray] => ArrayType(StringType, containsNull = true) case t if t.isInstanceOf[PVarbinaryArray] || t.isInstanceOf[PBinaryArray] => ArrayType(BinaryType, containsNull = true) case t if t.isInstanceOf[PLongArray] || t.isInstanceOf[PUnsignedLongArray] => ArrayType(LongType, containsNull = true) - case t if t.isInstanceOf[PSmallintArray] || t.isInstanceOf[PUnsignedSmallintArray] => ArrayType(IntegerType, containsNull = true) + case t if t.isInstanceOf[PSmallintArray] || t.isInstanceOf[PUnsignedSmallintArray] => ArrayType(ShortType, containsNull = true) case t if t.isInstanceOf[PTinyintArray] || t.isInstanceOf[PUnsignedTinyintArray] => ArrayType(ByteType, containsNull = true) case t if t.isInstanceOf[PFloatArray] || t.isInstanceOf[PUnsignedFloatArray] => ArrayType(FloatType, containsNull = true) case t if t.isInstanceOf[PDoubleArray] || t.isInstanceOf[PUnsignedDoubleArray] => ArrayType(DoubleType, containsNull = true) diff --git a/phoenix-spark-base/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala b/phoenix-spark-base/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala index f89a451..2bc0b88 100644 --- a/phoenix-spark-base/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala +++ b/phoenix-spark-base/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.jdbc import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType} -import org.apache.spark.sql.types.{BinaryType, ByteType, DataType, StringType} +import org.apache.spark.sql.types.{BinaryType, ByteType, DataType, StringType, ShortType} private object PhoenixJdbcDialect extends JdbcDialect { @@ -32,6 +32,7 @@ private object PhoenixJdbcDialect extends JdbcDialect { case StringType => Some(JdbcType("VARCHAR", java.sql.Types.VARCHAR)) case BinaryType => Some(JdbcType("BINARY(" + dt.defaultSize + ")", java.sql.Types.BINARY)) case ByteType => Some(JdbcType("TINYINT", java.sql.Types.TINYINT)) + case ShortType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT)) case _ => None }