Re: Problem with arrays in phoenix-spark

2015-11-30 Thread Josh Mahonin
Hi David,

Thanks for the bug report and the proposed patch. Please file a JIRA and
we'll take the discussion there.

Josh

On Mon, Nov 30, 2015 at 1:01 PM, Dawid Wysakowicz <
wysakowicz.da...@gmail.com> wrote:

> Hi,
>
> I've recently found some behaviour that I found buggy when working with
> phoenix-spark and arrays.
>
> Take a look at those unit tests:
>
>   test("Can save arrays from custom dataframes back to phoenix") {
> val dataSet = List(Row(2L, Array("String1", "String2", "String3")))
>
> val sqlContext = new SQLContext(sc)
>
> val schema = StructType(
> Seq(StructField("ID", LongType, nullable = false),
> StructField("VCARRAY", ArrayType(StringType
>
> val rowRDD = sc.parallelize(dataSet)
>
> // Apply the schema to the RDD.
> val df = sqlContext.createDataFrame(rowRDD, schema)
>
> df.write
>   .format("org.apache.phoenix.spark")
>   .options(Map("table" -> "ARRAY_TEST_TABLE", "zkUrl" ->
> quorumAddress))
>   .mode(SaveMode.Overwrite)
>   .save()
>   }
>
>   test("Can save arrays of AnyVal type back to phoenix") {
> val dataSet = List((2L, Array(1, 2, 3), Array(1L, 2L, 3L)))
>
> sc
>   .parallelize(dataSet)
>   .saveToPhoenix(
> "ARRAY_ANYVAL_TEST_TABLE",
> Seq("ID", "INTARRAY", "BIGINTARRAY"),
> zkUrl = Some(quorumAddress)
>   )
>
> // Load the results back
> val stmt = conn.createStatement()
> val rs = stmt.executeQuery("SELECT INTARRAY, BIGINTARRAY FROM
> ARRAY_ANYVAL_TEST_TABLE WHERE ID = 2")
> rs.next()
> val intArray = rs.getArray(1).getArray().asInstanceOf[Array[Int]]
> val longArray = rs.getArray(2).getArray().asInstanceOf[Array[Long]]
>
> // Verify the arrays are equal
> intArray shouldEqual dataSet(0)._2
> longArray shouldEqual dataSet(0)._3
>   }
>
> Both fail with some ClassCastExceptions.
>
> In attached patch I've proposed a solution. The tricky part is with
> Array[Byte] as this would be same for both VARBINARY and TINYINT[].
>
> Let me know If I should create an issue for this, and if my solution
> satisfies you.
>
> Regards
> Dawid Wysakowicz
>
>
>


Problem with arrays in phoenix-spark

2015-11-30 Thread Dawid Wysakowicz
Hi,

I've recently found some behaviour that I found buggy when working with
phoenix-spark and arrays.

Take a look at those unit tests:

  test("Can save arrays from custom dataframes back to phoenix") {
val dataSet = List(Row(2L, Array("String1", "String2", "String3")))

val sqlContext = new SQLContext(sc)

val schema = StructType(
Seq(StructField("ID", LongType, nullable = false),
StructField("VCARRAY", ArrayType(StringType

val rowRDD = sc.parallelize(dataSet)

// Apply the schema to the RDD.
val df = sqlContext.createDataFrame(rowRDD, schema)

df.write
  .format("org.apache.phoenix.spark")
  .options(Map("table" -> "ARRAY_TEST_TABLE", "zkUrl" -> quorumAddress))
  .mode(SaveMode.Overwrite)
  .save()
  }

  test("Can save arrays of AnyVal type back to phoenix") {
val dataSet = List((2L, Array(1, 2, 3), Array(1L, 2L, 3L)))

sc
  .parallelize(dataSet)
  .saveToPhoenix(
"ARRAY_ANYVAL_TEST_TABLE",
Seq("ID", "INTARRAY", "BIGINTARRAY"),
zkUrl = Some(quorumAddress)
  )

// Load the results back
val stmt = conn.createStatement()
val rs = stmt.executeQuery("SELECT INTARRAY, BIGINTARRAY FROM
ARRAY_ANYVAL_TEST_TABLE WHERE ID = 2")
rs.next()
val intArray = rs.getArray(1).getArray().asInstanceOf[Array[Int]]
val longArray = rs.getArray(2).getArray().asInstanceOf[Array[Long]]

// Verify the arrays are equal
intArray shouldEqual dataSet(0)._2
longArray shouldEqual dataSet(0)._3
  }

Both fail with some ClassCastExceptions.

In attached patch I've proposed a solution. The tricky part is with
Array[Byte] as this would be same for both VARBINARY and TINYINT[].

Let me know If I should create an issue for this, and if my solution
satisfies you.

Regards
Dawid Wysakowicz
From 5d24874cd0b2d15618843ada221634fa2a371d35 Mon Sep 17 00:00:00 2001
From: dawid 
Date: Mon, 30 Nov 2015 18:54:40 +0100
Subject: [PATCH] Phoenix-spark arrays

---
 phoenix-spark/src/it/resources/setup.sql   |  2 +
 .../org/apache/phoenix/spark/PhoenixSparkIT.scala  | 48 +-
 .../phoenix/spark/PhoenixRecordWritable.scala  | 27 
 3 files changed, 67 insertions(+), 10 deletions(-)

diff --git a/phoenix-spark/src/it/resources/setup.sql b/phoenix-spark/src/it/resources/setup.sql
index 154a996..e97148c 100644
--- a/phoenix-spark/src/it/resources/setup.sql
+++ b/phoenix-spark/src/it/resources/setup.sql
@@ -30,6 +30,8 @@ UPSERT INTO "table3" ("id", "col1") VALUES (1, 'foo')
 UPSERT INTO "table3" ("id", "col1") VALUES (2, 'bar')
 CREATE TABLE ARRAY_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[])
 UPSERT INTO ARRAY_TEST_TABLE (ID, VCARRAY) VALUES (1, ARRAY['String1', 'String2', 'String3'])
+CREATE TABLE ARRAY_ANYVAL_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, INTARRAY INTEGER[], BIGINTARRAY BIGINT[])
+UPSERT INTO ARRAY_ANYVAL_TEST_TABLE (ID, INTARRAY, BIGINTARRAY) VALUES (1, ARRAY[1, 2, 3], ARRAY[1, 2, 3])
 CREATE TABLE DATE_PREDICATE_TEST_TABLE (ID BIGINT NOT NULL, TIMESERIES_KEY TIMESTAMP NOT NULL CONSTRAINT pk PRIMARY KEY (ID, TIMESERIES_KEY))
 UPSERT INTO DATE_PREDICATE_TEST_TABLE (ID, TIMESERIES_KEY) VALUES (1, CAST(CURRENT_TIME() AS TIMESTAMP))
 CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER, col3 DATE)
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index e1c9df4..86769f6 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -23,8 +23,8 @@ import org.apache.phoenix.query.BaseTest
 import org.apache.phoenix.schema.{TableNotFoundException, ColumnNotFoundException}
 import org.apache.phoenix.schema.types.PVarchar
 import org.apache.phoenix.util.{SchemaUtil, ColumnInfo}
-import org.apache.spark.sql.{SaveMode, execution, SQLContext}
-import org.apache.spark.sql.types.{LongType, DataType, StringType, StructField}
+import org.apache.spark.sql.{Row, SaveMode, execution, SQLContext}
+import org.apache.spark.sql.types._
 import org.apache.spark.{SparkConf, SparkContext}
 import org.joda.time.DateTime
 import org.scalatest._
@@ -448,4 +448,48 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
 count shouldEqual 1L
 
   }
+
+  test("Can save arrays from custom dataframes back to phoenix") {
+val dataSet = List(Row(2L, Array("String1", "String2", "String3")))
+
+val sqlContext = new SQLContext(sc)
+
+val schema = StructType(
+Seq(StructField("ID", LongType, nullable = false),
+StructField("VCARRAY", ArrayType(StringType
+
+val rowRDD = sc.parallelize(dataSet)
+
+// Apply the schema to the RDD.
+val df = sqlContext.createDataFrame(rowRDD, schema)
+
+df.write
+