This is an automated email from the ASF dual-hosted git repository. tdsilva pushed a commit to branch 4.x-HBase-1.3 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.3 by this push: new f86b97b PHOENIX-5059 Use the Datasource v2 api in the spark connector f86b97b is described below commit f86b97b00d3935a699eaa8fd122463e468b42cd4 Author: Thomas D'Silva <tdsi...@apache.org> AuthorDate: Tue Dec 11 14:59:39 2018 -0800 PHOENIX-5059 Use the Datasource v2 api in the spark connector --- .../phoenix/end2end/salted/BaseSaltedTableIT.java | 6 +- phoenix-spark/pom.xml | 8 + .../java/org/apache/phoenix/spark/OrderByIT.java | 92 ++-- .../java/org/apache/phoenix/spark/SparkUtil.java | 25 +- phoenix-spark/src/it/resources/globalSetup.sql | 6 +- .../phoenix/spark/AbstractPhoenixSparkIT.scala | 12 +- .../org/apache/phoenix/spark/PhoenixSparkIT.scala | 541 +++++++++++---------- .../spark/PhoenixSparkITTenantSpecific.scala | 18 +- .../spark/datasource/v2/PhoenixDataSource.java | 82 ++++ .../v2/reader/PhoenixDataSourceReadOptions.java | 51 ++ .../v2/reader/PhoenixDataSourceReader.java | 201 ++++++++ .../v2/reader/PhoenixInputPartition.java | 44 ++ .../v2/reader/PhoenixInputPartitionReader.java | 168 +++++++ .../v2/writer/PhoenixDataSourceWriteOptions.java | 109 +++++ .../datasource/v2/writer/PhoenixDataWriter.java | 100 ++++ .../v2/writer/PhoenixDataWriterFactory.java | 19 + .../v2/writer/PhoenixDatasourceWriter.java | 34 ++ ...org.apache.spark.sql.sources.DataSourceRegister | 1 + .../apache/phoenix/spark/ConfigurationUtil.scala | 1 + .../apache/phoenix/spark/DataFrameFunctions.scala | 2 +- .../org/apache/phoenix/spark/DefaultSource.scala | 1 + ...lation.scala => FilterExpressionCompiler.scala} | 109 ++--- .../org/apache/phoenix/spark/PhoenixRDD.scala | 61 +-- .../phoenix/spark/PhoenixRecordWritable.scala | 2 +- .../org/apache/phoenix/spark/PhoenixRelation.scala | 70 +-- .../apache/phoenix/spark/ProductRDDFunctions.scala | 1 + .../phoenix/spark/SparkContextFunctions.scala | 1 + .../org/apache/phoenix/spark/SparkSchemaUtil.scala | 84 ++++ .../phoenix/spark/SparkSqlContextFunctions.scala | 1 + .../datasources/jdbc/PhoenixJdbcDialect.scala | 21 + .../execution/datasources/jdbc/SparkJdbcUtil.scala | 309 ++++++++++++ pom.xml | 2 +- 32 files changed, 1655 insertions(+), 527 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/BaseSaltedTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/BaseSaltedTableIT.java index 3051cd6..ef127ac 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/BaseSaltedTableIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/BaseSaltedTableIT.java @@ -194,7 +194,7 @@ public abstract class BaseSaltedTableIT extends ParallelStatsDisabledIT { .setSelectColumns( Lists.newArrayList("A_INTEGER", "A_STRING", "A_ID", "B_STRING", "B_INTEGER")) .setFullTableName(tableName) - .setWhereClause("a_integer = 1 AND a_string >= 'ab' AND a_string < 'de' AND a_id = '123'"); + .setWhereClause("A_INTEGER = 1 AND A_STRING >= 'ab' AND A_STRING < 'de' AND A_ID = '123'"); rs = executeQuery(conn, queryBuilder); assertTrue(rs.next()); assertEquals(1, rs.getInt(1)); @@ -205,7 +205,7 @@ public abstract class BaseSaltedTableIT extends ParallelStatsDisabledIT { assertFalse(rs.next()); // all single slots with one value. - queryBuilder.setWhereClause("a_integer = 1 AND a_string = 'ab' AND a_id = '123'"); + queryBuilder.setWhereClause("A_INTEGER = 1 AND A_STRING = 'ab' AND A_ID = '123'"); rs = executeQuery(conn, queryBuilder); assertTrue(rs.next()); assertEquals(1, rs.getInt(1)); @@ -216,7 +216,7 @@ public abstract class BaseSaltedTableIT extends ParallelStatsDisabledIT { assertFalse(rs.next()); // all single slots with multiple values. - queryBuilder.setWhereClause("a_integer in (2, 4) AND a_string = 'abc' AND a_id = '123'"); + queryBuilder.setWhereClause("A_INTEGER in (2, 4) AND A_STRING = 'abc' AND A_ID = '123'"); rs = executeQuery(conn, queryBuilder); assertTrue(rs.next()); diff --git a/phoenix-spark/pom.xml b/phoenix-spark/pom.xml index 038e314..f426c83 100644 --- a/phoenix-spark/pom.xml +++ b/phoenix-spark/pom.xml @@ -487,6 +487,14 @@ <testSourceDirectory>src/it/scala</testSourceDirectory> <testResources><testResource><directory>src/it/resources</directory></testResource></testResources> <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>build-helper-maven-plugin</artifactId> diff --git a/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java b/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java index bdffaf5..4c60bc8 100644 --- a/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java +++ b/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java @@ -16,12 +16,13 @@ import java.util.List; import java.util.Properties; import org.apache.phoenix.end2end.BaseOrderByIT; +import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryBuilder; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; -import org.junit.Ignore; +import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.junit.Test; import com.google.common.collect.Lists; @@ -106,22 +107,15 @@ public class OrderByIT extends BaseOrderByIT { // create two PhoenixRDDs using the table names and columns that are required for the JOIN query List<String> table1Columns = Lists.newArrayList("A_STRING", "CF1.A", "CF1.B", "COL1", "CF2.C", "CF2.D"); - SQLContext sqlContext = SparkUtil.getSqlContext(); - Dataset phoenixDataSet = - new PhoenixRDD(SparkUtil.getSparkContext(), tableName1, - JavaConverters.collectionAsScalaIterableConverter(table1Columns) - .asScala().toSeq(), - Option.apply((String) null), Option.apply(getUrl()), config, false, - null).toDataFrame(sqlContext); - phoenixDataSet.registerTempTable(tableName1); - List<String> table2Columns = Lists.newArrayList("A_STRING", "COL1"); - phoenixDataSet = - new PhoenixRDD(SparkUtil.getSparkContext(), tableName2, - JavaConverters.collectionAsScalaIterableConverter(table2Columns) - .asScala().toSeq(), - Option.apply((String) null), Option.apply(getUrl()), config, false, - null).toDataFrame(sqlContext); - phoenixDataSet.registerTempTable(tableName2); + SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext(); + Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix") + .option(DataSourceOptions.TABLE_KEY, tableName1) + .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load(); + phoenixDataSet.createOrReplaceTempView(tableName1); + phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix") + .option(DataSourceOptions.TABLE_KEY, tableName2) + .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load(); + phoenixDataSet.createOrReplaceTempView(tableName2); String query = "SELECT T1.* FROM " + tableName1 + " T1 JOIN " + tableName2 @@ -155,8 +149,8 @@ public class OrderByIT extends BaseOrderByIT { assertFalse(rs.next()); query = - "select t1.a_string, t2.col1 from " + tableName1 + " t1 join " + tableName2 - + " t2 on t1.a_string = t2.a_string order by t2.col1"; + "SELECT T1.A_STRING, T2.COL1 FROM " + tableName1 + " T1 JOIN " + tableName2 + + " T2 ON T1.A_STRING = T2.A_STRING ORDER BY T2.COL1"; dataset = sqlContext.sql(query); rows = dataset.collectAsList(); rs = new SparkResultSet(rows, dataset.columns()); @@ -228,23 +222,15 @@ public class OrderByIT extends BaseOrderByIT { conn.commit(); - List<String> table1Columns = Lists.newArrayList("A_STRING", "CF1.A", "CF1.B", "COL1", "CF2.C", "CF2.D"); - SQLContext sqlContext = SparkUtil.getSqlContext(); - Dataset phoenixDataSet = - new PhoenixRDD(SparkUtil.getSparkContext(), tableName1, - JavaConverters.collectionAsScalaIterableConverter(table1Columns) - .asScala().toSeq(), - Option.apply((String) null), Option.apply(getUrl()), config, false, - null).toDataFrame(sqlContext); - phoenixDataSet.registerTempTable(tableName1); - List<String> table2Columns = Lists.newArrayList("A_STRING", "COL1"); - phoenixDataSet = - new PhoenixRDD(SparkUtil.getSparkContext(), tableName2, - JavaConverters.collectionAsScalaIterableConverter(table2Columns) - .asScala().toSeq(), - Option.apply((String) null), Option.apply(getUrl()), config, false, - null).toDataFrame(sqlContext); - phoenixDataSet.registerTempTable(tableName2); + SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext(); + Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix") + .option(DataSourceOptions.TABLE_KEY, tableName1) + .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load(); + phoenixDataSet.createOrReplaceTempView(tableName1); + phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix") + .option(DataSourceOptions.TABLE_KEY, tableName2) + .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load(); + phoenixDataSet.createOrReplaceTempView(tableName2); String query = "select a_string, `cf2.d` from " + tableName1 + " union all select * from " @@ -311,17 +297,11 @@ public class OrderByIT extends BaseOrderByIT { stmt.execute(); conn.commit(); - SQLContext sqlContext = SparkUtil.getSqlContext(); - Dataset phoenixDataSet = - new PhoenixRDD(SparkUtil.getSparkContext(), tableName, - JavaConverters - .collectionAsScalaIterableConverter( - Lists.newArrayList("col1", "col2", "col4")) - .asScala().toSeq(), - Option.apply((String) null), Option.apply(getUrl()), config, false, - null).toDataFrame(sqlContext); - - phoenixDataSet.registerTempTable(tableName); + SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext(); + Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix") + .option(DataSourceOptions.TABLE_KEY, tableName) + .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load(); + phoenixDataSet.createOrReplaceTempView(tableName); Dataset<Row> dataset = sqlContext.sql("SELECT col1+col2, col4, a_string FROM " + tableName + " ORDER BY col1+col2, col4"); @@ -379,19 +359,11 @@ public class OrderByIT extends BaseOrderByIT { conn.commit(); - List<String> columns = - Lists.newArrayList("A_STRING", "CF1.A", "CF1.B", "COL1", "CF2.C", "CF2.D", - "COL2"); - - SQLContext sqlContext = SparkUtil.getSqlContext(); - Dataset phoenixDataSet = - new PhoenixRDD(SparkUtil.getSparkContext(), tableName, - JavaConverters.collectionAsScalaIterableConverter(columns).asScala() - .toSeq(), - Option.apply((String) null), Option.apply(url), config, false, null) - .toDataFrame(sqlContext); - - phoenixDataSet.registerTempTable(tableName); + SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext(); + Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix") + .option(DataSourceOptions.TABLE_KEY, tableName) + .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load(); + phoenixDataSet.createOrReplaceTempView(tableName); Dataset<Row> dataset = sqlContext.sql("SELECT A_STRING, `CF1.A`, `CF1.B`, COL1, `CF2.C`, `CF2.D`, COL2 from " + tableName + " ORDER BY `CF1.A`,`CF2.C`"); diff --git a/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java b/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java index 6285209..668c3c8 100644 --- a/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java +++ b/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java @@ -22,13 +22,14 @@ import com.google.common.base.Joiner; import org.apache.hadoop.conf.Configuration; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource; import org.apache.phoenix.util.QueryBuilder; -import org.apache.spark.SparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.execution.SparkPlan; +import org.apache.spark.sql.sources.v2.DataSourceOptions; import scala.Option; import scala.collection.JavaConverters; @@ -43,19 +44,14 @@ public class SparkUtil { public static final String NUM_EXECUTORS = "local[2]"; public static final String UI_SHOW_CONSOLE_PROGRESS = "spark.ui.showConsoleProgress"; - public static SparkContext getSparkContext() { + public static SparkSession getSparkSession() { return SparkSession.builder().appName(APP_NAME).master(NUM_EXECUTORS) - .config(UI_SHOW_CONSOLE_PROGRESS, false).getOrCreate().sparkContext(); - } - - public static SQLContext getSqlContext() { - return SparkSession.builder().appName(APP_NAME).master(NUM_EXECUTORS) - .config(UI_SHOW_CONSOLE_PROGRESS, false).getOrCreate().sqlContext(); + .config(UI_SHOW_CONSOLE_PROGRESS, false).getOrCreate(); } public static ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder, String url, Configuration config) throws SQLException { - SQLContext sqlContext = SparkUtil.getSqlContext(); + SQLContext sqlContext = getSparkSession().sqlContext(); boolean forceRowKeyOrder = conn.unwrap(PhoenixConnection.class).getQueryServices().getProps() @@ -69,14 +65,11 @@ public class SparkUtil { // create PhoenixRDD using the table name and columns that are required by the query // since we don't set the predicate filtering is done after rows are returned from spark - Dataset phoenixDataSet = - new PhoenixRDD(SparkUtil.getSparkContext(), queryBuilder.getFullTableName(), - JavaConverters.collectionAsScalaIterableConverter(queryBuilder.getRequiredColumns()).asScala() - .toSeq(), - Option.apply((String) null), Option.apply(url), config, false, - null).toDataFrame(sqlContext); + Dataset phoenixDataSet = getSparkSession().read().format("phoenix") + .option(DataSourceOptions.TABLE_KEY, queryBuilder.getFullTableName()) + .option(PhoenixDataSource.ZOOKEEPER_URL, url).load(); - phoenixDataSet.registerTempTable(queryBuilder.getFullTableName()); + phoenixDataSet.createOrReplaceTempView(queryBuilder.getFullTableName()); Dataset<Row> dataset = sqlContext.sql(queryBuilder.build()); SparkPlan plan = dataset.queryExecution().executedPlan(); List<Row> rows = dataset.collectAsList(); diff --git a/phoenix-spark/src/it/resources/globalSetup.sql b/phoenix-spark/src/it/resources/globalSetup.sql index 7ac0039..efdb8cb 100644 --- a/phoenix-spark/src/it/resources/globalSetup.sql +++ b/phoenix-spark/src/it/resources/globalSetup.sql @@ -26,9 +26,9 @@ UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (3, 2, 'test_child_1') UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (4, 2, 'test_child_2') UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (5, 2, 'test_child_3') UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (6, 2, 'test_child_4') -CREATE TABLE "table3" ("id" BIGINT NOT NULL PRIMARY KEY, "col1" VARCHAR) -UPSERT INTO "table3" ("id", "col1") VALUES (1, 'foo') -UPSERT INTO "table3" ("id", "col1") VALUES (2, 'bar') +CREATE TABLE "table4" ("id" BIGINT NOT NULL PRIMARY KEY, "col1" VARCHAR) +UPSERT INTO "table4" ("id", "col1") VALUES (1, 'foo') +UPSERT INTO "table4" ("id", "col1") VALUES (2, 'bar') CREATE TABLE ARRAY_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[]) UPSERT INTO ARRAY_TEST_TABLE (ID, VCARRAY) VALUES (1, ARRAY['String1', 'String2', 'String3']) CREATE TABLE ARRAYBUFFER_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[], INTARRAY INTEGER[]) diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala index ca3470f..a9c2070 100644 --- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala +++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala @@ -19,6 +19,7 @@ import java.util.Properties import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT import org.apache.phoenix.query.BaseTest import org.apache.phoenix.util.PhoenixRuntime +import org.apache.spark.sql.{SQLContext, SparkSession} import org.apache.spark.{SparkConf, SparkContext} import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite, Matchers} @@ -50,7 +51,7 @@ class AbstractPhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfter final val TenantId = "theTenant" var conn: Connection = _ - var sc: SparkContext = _ + var spark: SparkSession = _ lazy val hbaseConfiguration = { val conf = PhoenixSparkITHelper.getTestClusterConfig @@ -99,12 +100,17 @@ class AbstractPhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfter .setMaster("local[2]") // 2 threads, some parallelism .set("spark.ui.showConsoleProgress", "false") // Disable printing stage progress - sc = new SparkContext(conf) + spark = SparkSession + .builder() + .appName("PhoenixSparkIT") + .master("local[2]") // 2 threads, some parallelism + .config("spark.ui.showConsoleProgress", "false") + .getOrCreate() } override def afterAll() { conn.close() - sc.stop() + spark.stop() PhoenixSparkITHelper.cleanUpAfterTest() PhoenixSparkITHelper.doTeardown } diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala index d1e38fa..d6d0f92 100644 --- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala +++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala @@ -13,16 +13,16 @@ */ package org.apache.phoenix.spark +import java.sql.DriverManager import java.util.Date import org.apache.phoenix.schema.types.PVarchar +import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource import org.apache.phoenix.util.{ColumnInfo, SchemaUtil} import org.apache.spark.sql.types._ -import org.apache.spark.sql.{Row, SQLContext, SaveMode} -import org.joda.time.DateTime -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.{Row, SaveMode} + import scala.collection.mutable.ListBuffer -import org.apache.hadoop.conf.Configuration /** * Note: If running directly from an IDE, these are the recommended VM parameters: @@ -30,13 +30,20 @@ import org.apache.hadoop.conf.Configuration */ class PhoenixSparkIT extends AbstractPhoenixSparkIT { - test("Can persist data with case senstive columns (like in avro schema) using 'DataFrame.saveToPhoenix'") { - val sqlContext = new SQLContext(sc) - val df = sqlContext.createDataFrame( + test("Can persist data with case sensitive columns (like in avro schema)") { + val df = spark.createDataFrame( Seq( (1, 1, "test_child_1"), - (2, 1, "test_child_2"))).toDF("ID", "TABLE3_ID", "t2col1") - df.saveToPhoenix("TABLE3", zkUrl = Some(quorumAddress),skipNormalizingIdentifier=true) + (2, 1, "test_child_2"))). + // column names are case sensitive + toDF("ID", "TABLE3_ID", "t2col1") + df.write + .format("phoenix") + .options(Map("table" -> "TABLE3", + PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress, PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER -> "true")) + .mode(SaveMode.Overwrite) + .save() + // Verify results val stmt = conn.createStatement() @@ -50,7 +57,29 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { stmt.close() results.toList shouldEqual checkResults + } + + // INSERT is not support using DataSource v2 api yet + ignore("Can use write data using spark SQL INSERT") { + val df1 = spark.sqlContext.read.format("phoenix") + .options( Map("table" -> "TABLE3", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load + df1.createOrReplaceTempView("TABLE3") + + // Insert data + spark.sql("INSERT INTO TABLE3 VALUES(10, 10, 10)") + spark.sql("INSERT INTO TABLE3 VALUES(20, 20, 20)") + + // Verify results + val stmt = conn.createStatement() + val rs = stmt.executeQuery("SELECT * FROM TABLE3 WHERE ID>=10") + val expectedResults = List((10, 10, "10"), (20, 20, "20")) + val results = ListBuffer[(Long, Long, String)]() + while (rs.next()) { + results.append((rs.getLong(1), rs.getLong(2), rs.getString(3))) + } + stmt.close() + results.toList shouldEqual expectedResults } test("Can convert Phoenix schema") { @@ -58,29 +87,25 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { new ColumnInfo("varcharColumn", PVarchar.INSTANCE.getSqlType) ) - val rdd = new PhoenixRDD(sc, "MyTable", Array("Foo", "Bar"), - conf = hbaseConfiguration) - - val catalystSchema = rdd.phoenixSchemaToCatalystSchema(phoenixSchema) + val catalystSchema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(phoenixSchema) - val expected = List(StructField("varcharColumn", StringType, nullable = true)) + val expected = new StructType(List(StructField("varcharColumn", StringType, nullable = true)).toArray) catalystSchema shouldEqual expected } test("Can create schema RDD and execute query") { - val sqlContext = new SQLContext(sc) + val df1 = spark.sqlContext.read.format("phoenix") + .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load - val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = hbaseConfiguration) + df1.createOrReplaceTempView("sql_table_1") - df1.registerTempTable("sql_table_1") + val df2 = spark.sqlContext.read.format("phoenix") + .options( Map("table" -> "TABLE2", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load - val df2 = sqlContext.phoenixTableAsDataFrame("TABLE2", Array("ID", "TABLE1_ID"), - conf = hbaseConfiguration) + df2.createOrReplaceTempView("sql_table_2") - df2.registerTempTable("sql_table_2") - - val sqlRdd = sqlContext.sql( + val sqlRdd = spark.sql( """ |SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1 |INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)""".stripMargin @@ -91,18 +116,49 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { count shouldEqual 6L } - test("Can create schema RDD and execute query on case sensitive table (no config)") { - val sqlContext = new SQLContext(sc) + ignore("Ordering by pk columns should not require sorting") { + val df1 = spark.sqlContext.read.format("phoenix") + .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load + df1.createOrReplaceTempView("TABLE1") + val sqlRdd = spark.sql("SELECT * FROM TABLE1 ORDER BY ID, COL1") + val plan = sqlRdd.queryExecution.sparkPlan + // verify the spark plan doesn't have a sort + assert(!plan.toString.contains("Sort")) - val df1 = sqlContext.phoenixTableAsDataFrame( - SchemaUtil.getEscapedArgument("table3"), - Array("id", "col1"), - zkUrl = Some(quorumAddress)) + val expectedResults = Array(Row.fromSeq(Seq(1, "test_row_1")), Row.fromSeq(Seq(2, "test_row_2"))) + val actual = sqlRdd.collect() - df1.registerTempTable("table3") + actual shouldEqual expectedResults + } + + test("Verify correct number of partitions are created") { + val conn = DriverManager.getConnection(PhoenixSparkITHelper.getUrl) + val ddl = "CREATE TABLE SPLIT_TABLE (id VARCHAR NOT NULL PRIMARY KEY, val VARCHAR) split on ('e','j','o')" + conn.createStatement.execute(ddl) + val keys = Array("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", + "t", "u", "v", "w", "x", "y", "z") + for (key <- keys) { + conn.createStatement.execute("UPSERT INTO SPLIT_TABLE VALUES('" + key + "', '" + key + "')") + } + conn.commit() - val sqlRdd = sqlContext.sql("SELECT * FROM table3") + val df1 = spark.sqlContext.read.format("phoenix") + .options( Map("table" -> "SPLIT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load + df1.createOrReplaceTempView("SPLIT_TABLE") + val sqlRdd = spark.sql("SELECT * FROM SPLIT_TABLE") + val numPartitions = sqlRdd.rdd.partitions.size + + numPartitions shouldEqual 4 + } + + test("Can create schema RDD and execute query on case sensitive table (no config)") { + val df1 = spark.sqlContext.read.format("phoenix") + .options( Map("table" -> SchemaUtil.getEscapedArgument("table4"), PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load + + df1.createOrReplaceTempView("table4") + + val sqlRdd = spark.sql("SELECT id FROM table4") val count = sqlRdd.count() @@ -110,20 +166,17 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { } test("Can create schema RDD and execute constrained query") { - val sqlContext = new SQLContext(sc) - - val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), - conf = hbaseConfiguration) + val df1 = spark.sqlContext.read.format("phoenix") + .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load - df1.registerTempTable("sql_table_1") + df1.createOrReplaceTempView("sql_table_1") - val df2 = sqlContext.phoenixTableAsDataFrame("TABLE2", Array("ID", "TABLE1_ID"), - predicate = Some("\"ID\" = 1"), - conf = hbaseConfiguration) + val df2 = spark.sqlContext.read.format("phoenix") + .options( Map("table" -> "TABLE2", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load.filter("ID = 1") - df2.registerTempTable("sql_table_2") + df2.createOrReplaceTempView("sql_table_2") - val sqlRdd = sqlContext.sql( + val sqlRdd = spark.sql( """ |SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1 |INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)""".stripMargin @@ -135,17 +188,12 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { } test("Can create schema RDD with predicate that will never match") { - val sqlContext = new SQLContext(sc) - - val df1 = sqlContext.phoenixTableAsDataFrame( - SchemaUtil.getEscapedArgument("table3"), - Array("id", "col1"), - predicate = Some("\"id\" = -1"), - conf = hbaseConfiguration) + val df1 = spark.sqlContext.read.format("phoenix") + .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load.filter("ID = -1") - df1.registerTempTable("table3") + df1.createOrReplaceTempView("table3") - val sqlRdd = sqlContext.sql("SELECT * FROM table3") + val sqlRdd = spark.sql("SELECT * FROM table3") val count = sqlRdd.count() @@ -153,21 +201,17 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { } test("Can create schema RDD with complex predicate") { - val sqlContext = new SQLContext(sc) - - val df1 = sqlContext.phoenixTableAsDataFrame( - "DATE_PREDICATE_TEST_TABLE", - Array("ID", "TIMESERIES_KEY"), - predicate = Some( - """ - |ID > 0 AND TIMESERIES_KEY BETWEEN - |CAST(TO_DATE('1990-01-01 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AND - |CAST(TO_DATE('1990-01-30 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP)""".stripMargin), - conf = hbaseConfiguration) + val predicate = "ID > 0 AND TIMESERIES_KEY BETWEEN " + + "CAST(TO_DATE('1990-01-01 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AND " + + "CAST(TO_DATE('1990-01-30 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP)" + val df1 = spark.sqlContext.read.format("phoenix") + .options(Map("table" -> "DATE_PREDICATE_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)) + .load + .filter(predicate) - df1.registerTempTable("date_predicate_test_table") + df1.createOrReplaceTempView("date_predicate_test_table") - val sqlRdd = df1.sqlContext.sql("SELECT * FROM date_predicate_test_table") + val sqlRdd = spark.sqlContext.sql("SELECT * FROM date_predicate_test_table") val count = sqlRdd.count() @@ -175,14 +219,12 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { } test("Can query an array table") { - val sqlContext = new SQLContext(sc) + val df1 = spark.sqlContext.read.format("phoenix") + .options( Map("table" -> "ARRAY_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load - val df1 = sqlContext.phoenixTableAsDataFrame("ARRAY_TEST_TABLE", Array("ID", "VCARRAY"), - conf = hbaseConfiguration) + df1.createOrReplaceTempView("ARRAY_TEST_TABLE") - df1.registerTempTable("ARRAY_TEST_TABLE") - - val sqlRdd = sqlContext.sql("SELECT * FROM ARRAY_TEST_TABLE") + val sqlRdd = spark.sql("SELECT * FROM ARRAY_TEST_TABLE") val count = sqlRdd.count() @@ -195,12 +237,12 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { } test("Can read a table as an RDD") { - val rdd1 = sc.phoenixTableAsRDD("ARRAY_TEST_TABLE", Seq("ID", "VCARRAY"), - conf = hbaseConfiguration) + val rdd1 = spark.sqlContext.read.format("phoenix") + .options( Map("table" -> "ARRAY_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load val count = rdd1.count() - val arrayValues = rdd1.take(1)(0)("VCARRAY") + val arrayValues = rdd1.take(1)(0)(1) arrayValues should equal(Array("String1", "String2", "String3")) @@ -208,24 +250,30 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { } test("Can save to phoenix table") { - val sqlContext = new SQLContext(sc) + val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3)) + + val schema = StructType( + Seq(StructField("ID", LongType, nullable = false), + StructField("COL1", StringType), + StructField("COL2", IntegerType))) - val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3)) + val rowRDD = spark.sparkContext.parallelize(dataSet) - sc - .parallelize(dataSet) - .saveToPhoenix( - "OUTPUT_TEST_TABLE", - Seq("ID", "COL1", "COL2"), - hbaseConfiguration - ) + // Apply the schema to the RDD. + val df = spark.sqlContext.createDataFrame(rowRDD, schema) + + df.write + .format("phoenix") + .options(Map("table" -> "OUTPUT_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)) + .mode(SaveMode.Overwrite) + .save() // Load the results back val stmt = conn.createStatement() val rs = stmt.executeQuery("SELECT ID, COL1, COL2 FROM OUTPUT_TEST_TABLE") - val results = ListBuffer[(Long, String, Int)]() + val results = ListBuffer[Row]() while (rs.next()) { - results.append((rs.getLong(1), rs.getString(2), rs.getInt(3))) + results.append(Row(rs.getLong(1), rs.getString(2), rs.getInt(3))) } // Verify they match @@ -234,18 +282,29 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { } } - test("Can save Java and Joda dates to Phoenix (no config)") { - val dt = new DateTime() - val date = new Date() + test("Can save dates to Phoenix using java.sql.Date") { + val date = java.sql.Date.valueOf("2016-09-30") + + // Since we are creating a Row we have to use java.sql.date + // java.util.date or joda.DateTime is not supported + val dataSet = Seq(Row(1L, "1", 1, date), Row(2L, "2", 2, date)) + + val schema = StructType( + Seq(StructField("ID", LongType, nullable = false), + StructField("COL1", StringType), + StructField("COL2", IntegerType), + StructField("COL3", DateType))) - val dataSet = List((1L, "1", 1, dt), (2L, "2", 2, date)) - sc - .parallelize(dataSet) - .saveToPhoenix( - "OUTPUT_TEST_TABLE", - Seq("ID", "COL1", "COL2", "COL3"), - zkUrl = Some(quorumAddress) - ) + val rowRDD = spark.sparkContext.parallelize(dataSet) + + // Apply the schema to the RDD. + val df = spark.sqlContext.createDataFrame(rowRDD, schema) + + df.write + .format("phoenix") + .options(Map("table" -> "OUTPUT_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)) + .mode(SaveMode.Overwrite) + .save() // Load the results back val stmt = conn.createStatement() @@ -256,94 +315,56 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { } // Verify the epochs are equal - results(0).getTime shouldEqual dt.getMillis + results(0).getTime shouldEqual date.getTime results(1).getTime shouldEqual date.getTime } test("Can infer schema without defining columns") { - val sqlContext = new SQLContext(sc) - val df = sqlContext.phoenixTableAsDataFrame("TABLE2", Seq(), conf = hbaseConfiguration) + val df = spark.sqlContext.read.format("phoenix") + .options( Map("table" -> "TABLE2", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load() df.schema("ID").dataType shouldEqual LongType df.schema("TABLE1_ID").dataType shouldEqual LongType df.schema("t2col1").dataType shouldEqual StringType } test("Spark SQL can use Phoenix as a data source with no schema specified") { - val sqlContext = new SQLContext(sc) - val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1", - "zkUrl" -> quorumAddress)) + val df = spark.sqlContext.read.format("phoenix") + .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load df.count() shouldEqual 2 df.schema("ID").dataType shouldEqual LongType df.schema("COL1").dataType shouldEqual StringType } - test("Spark SQL can use Phoenix as a data source with PrunedFilteredScan") { - val sqlContext = new SQLContext(sc) - val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1", - "zkUrl" -> quorumAddress)) + test("Datasource v2 pushes down filters") { + val df = spark.sqlContext.read.format("phoenix") + .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load val res = df.filter(df("COL1") === "test_row_1" && df("ID") === 1L).select(df("ID")) // Make sure we got the right value back assert(res.first().getLong(0) == 1L) val plan = res.queryExecution.sparkPlan - // filters should be pushed into phoenix relation - assert(plan.toString.contains("PushedFilters: [*IsNotNull(COL1), *IsNotNull(ID), " + - "*EqualTo(COL1,test_row_1), *EqualTo(ID,1)]")) - // spark should run the filters on the rows returned by Phoenix - assert(!plan.toString.matches(".*Filter (((isnotnull(COL1.*) && isnotnull(ID.*)) " - + " && (COL1.* = test_row_1)) && (ID.* = 1)).*")) + // filters should be pushed into scan + assert(".*ScanV2 phoenix.*Filters.*ID.*COL1.*".r.findFirstIn(plan.toString).isDefined) + // spark should not do post scan filtering + assert(".*Filter .*ID.*COL1.*".r.findFirstIn(plan.toString).isEmpty) } - test("Can persist a dataframe using 'DataFrame.saveToPhoenix'") { + test("Can persist a dataframe") { // Load from TABLE1 - val sqlContext = new SQLContext(sc) - val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1", - "zkUrl" -> quorumAddress)) + val df = spark.sqlContext.read.format("phoenix").options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load // Save to TABLE1_COPY - df.saveToPhoenix("TABLE1_COPY", zkUrl = Some(quorumAddress)) - - // Verify results - val stmt = conn.createStatement() - val rs = stmt.executeQuery("SELECT * FROM TABLE1_COPY") - - val checkResults = List((1L, "test_row_1"), (2, "test_row_2")) - val results = ListBuffer[(Long, String)]() - while (rs.next()) { - results.append((rs.getLong(1), rs.getString(2))) - } - stmt.close() - - results.toList shouldEqual checkResults - } - - test("Can persist a dataframe using 'DataFrame.save()") { - // Clear TABLE1_COPY - var stmt = conn.createStatement() - stmt.executeUpdate("DELETE FROM TABLE1_COPY") - stmt.close() - - // Load TABLE1, save as TABLE1_COPY - val sqlContext = new SQLContext(sc) - val df = sqlContext - .read - .format("org.apache.phoenix.spark") - .option("table", "TABLE1") - .option("zkUrl", quorumAddress) - .load() - - // Save to TABLE21_COPY df .write - .format("org.apache.phoenix.spark") + .format("phoenix") .mode(SaveMode.Overwrite) .option("table", "TABLE1_COPY") - .option("zkUrl", quorumAddress) + .option(PhoenixDataSource.ZOOKEEPER_URL, quorumAddress) .save() // Verify results - stmt = conn.createStatement() + val stmt = conn.createStatement() val rs = stmt.executeQuery("SELECT * FROM TABLE1_COPY") val checkResults = List((1L, "test_row_1"), (2, "test_row_2")) @@ -357,15 +378,22 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { } test("Can save arrays back to phoenix") { - val dataSet = List((2L, Array("String1", "String2", "String3"))) + val dataSet = List(Row(2L, Array("String1", "String2", "String3"))) + val schema = StructType(Seq( + StructField("ID", LongType, nullable = false), + StructField("VCARRAY", ArrayType(StringType, true)) + )) - sc - .parallelize(dataSet) - .saveToPhoenix( - "ARRAY_TEST_TABLE", - Seq("ID", "VCARRAY"), - zkUrl = Some(quorumAddress) - ) + val rowRDD = spark.sparkContext.parallelize(dataSet) + + // Apply the schema to the RDD. + val df = spark.sqlContext.createDataFrame(rowRDD, schema) + + df.write + .format("phoenix") + .options(Map("table" -> "ARRAY_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)) + .mode(SaveMode.Overwrite) + .save() // Load the results back val stmt = conn.createStatement() @@ -374,57 +402,54 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { val sqlArray = rs.getArray(1).getArray().asInstanceOf[Array[String]] // Verify the arrays are equal - sqlArray shouldEqual dataSet(0)._2 + sqlArray shouldEqual dataSet(0).get(1) } test("Can read from table with schema and escaped table name") { // Manually escape - val rdd1 = sc.phoenixTableAsRDD( - "CUSTOM_ENTITY.\"z02\"", - Seq("ID"), - conf = hbaseConfiguration) + val df1 = spark.sqlContext.read.format("phoenix") + .options(Map("table" -> "CUSTOM_ENTITY.\"z02\"", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load() - var count = rdd1.count() + var count = df1.count() count shouldEqual 1L // Use SchemaUtil - val rdd2 = sc.phoenixTableAsRDD( - SchemaUtil.getEscapedFullTableName("CUSTOM_ENTITY.z02"), - Seq("ID"), - conf = hbaseConfiguration) + val df2 = spark.sqlContext.read.format("phoenix") + .options( + Map("table" -> SchemaUtil.getEscapedFullTableName("CUSTOM_ENTITY.z02"), PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)) + .load() - count = rdd2.count() + count = df2.count() count shouldEqual 1L - } test("Ensure DataFrame field normalization (PHOENIX-2196)") { - val rdd1 = sc + val rdd1 = spark.sparkContext .parallelize(Seq((1L, 1L, "One"), (2L, 2L, "Two"))) .map(p => Row(p._1, p._2, p._3)) - val sqlContext = new SQLContext(sc) - val schema = StructType(Seq( StructField("id", LongType, nullable = false), StructField("table1_id", LongType, nullable = true), StructField("\"t2col1\"", StringType, nullable = true) )) - val df = sqlContext.createDataFrame(rdd1, schema) + val df = spark.sqlContext.createDataFrame(rdd1, schema) - df.saveToPhoenix("TABLE2", zkUrl = Some(quorumAddress)) + df.write + .format("phoenix") + .options(Map("table" -> "TABLE2", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)) + .mode(SaveMode.Overwrite) + .save() } test("Ensure Dataframe supports LIKE and IN filters (PHOENIX-2328)") { - val sqlContext = new SQLContext(sc) - val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1", - "zkUrl" -> quorumAddress)) - + val df = spark.sqlContext.read.format("phoenix").options(Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load() // Prefix match val res1 = df.filter("COL1 like 'test_row_%'") + val plan = res1.groupBy().count().queryExecution.sparkPlan res1.count() shouldEqual 2 // Suffix match @@ -463,14 +488,14 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { } test("Can load decimal types with accurate precision and scale (PHOENIX-2288)") { - val sqlContext = new SQLContext(sc) - val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TEST_DECIMAL", "zkUrl" -> quorumAddress)) + val df = spark.sqlContext.read.format("phoenix") + .options(Map("table" -> "TEST_DECIMAL", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load() assert(df.select("COL1").first().getDecimal(0) == BigDecimal("123.456789").bigDecimal) } - test("Can load small and tiny integeger types (PHOENIX-2426)") { - val sqlContext = new SQLContext(sc) - val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TEST_SMALL_TINY", "zkUrl" -> quorumAddress)) + test("Can load small and tiny integer types (PHOENIX-2426)") { + val df = spark.sqlContext.read.format("phoenix") + .options(Map("table" -> "TEST_SMALL_TINY", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load() assert(df.select("COL1").first().getShort(0).toInt == 32767) assert(df.select("COL2").first().getByte(0).toInt == 127) } @@ -478,21 +503,19 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { test("Can save arrays from custom dataframes back to phoenix") { val dataSet = List(Row(2L, Array("String1", "String2", "String3"), Array(1, 2, 3))) - val sqlContext = new SQLContext(sc) - val schema = StructType( Seq(StructField("ID", LongType, nullable = false), StructField("VCARRAY", ArrayType(StringType)), StructField("INTARRAY", ArrayType(IntegerType)))) - val rowRDD = sc.parallelize(dataSet) + val rowRDD = spark.sparkContext.parallelize(dataSet) // Apply the schema to the RDD. - val df = sqlContext.createDataFrame(rowRDD, schema) + val df = spark.sqlContext.createDataFrame(rowRDD, schema) df.write - .format("org.apache.phoenix.spark") - .options(Map("table" -> "ARRAYBUFFER_TEST_TABLE", "zkUrl" -> quorumAddress)) + .format("phoenix") + .options(Map("table" -> "ARRAYBUFFER_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)) .mode(SaveMode.Overwrite) .save() @@ -509,15 +532,23 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { } test("Can save arrays of AnyVal type back to phoenix") { - val dataSet = List((2L, Array(1, 2, 3), Array(1L, 2L, 3L))) + val dataSet = List(Row(2L, Array(1, 2, 3), Array(1L, 2L, 3L))) + + val schema = StructType( + Seq(StructField("ID", LongType, nullable = false), + StructField("INTARRAY", ArrayType(IntegerType)), + StructField("BIGINTARRAY", ArrayType(LongType)))) + + val rowRDD = spark.sparkContext.parallelize(dataSet) + + // Apply the schema to the RDD. + val df = spark.sqlContext.createDataFrame(rowRDD, schema) - sc - .parallelize(dataSet) - .saveToPhoenix( - "ARRAY_ANYVAL_TEST_TABLE", - Seq("ID", "INTARRAY", "BIGINTARRAY"), - zkUrl = Some(quorumAddress) - ) + df.write + .format("phoenix") + .options(Map("table" -> "ARRAY_ANYVAL_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)) + .mode(SaveMode.Overwrite) + .save() // Load the results back val stmt = conn.createStatement() @@ -527,20 +558,27 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { val longArray = rs.getArray(2).getArray().asInstanceOf[Array[Long]] // Verify the arrays are equal - intArray shouldEqual dataSet(0)._2 - longArray shouldEqual dataSet(0)._3 + intArray shouldEqual dataSet(0).get(1) + longArray shouldEqual dataSet(0).get(2) } test("Can save arrays of Byte type back to phoenix") { - val dataSet = List((2L, Array(1.toByte, 2.toByte, 3.toByte))) + val dataSet = List(Row(2L, Array(1.toByte, 2.toByte, 3.toByte))) - sc - .parallelize(dataSet) - .saveToPhoenix( - "ARRAY_BYTE_TEST_TABLE", - Seq("ID", "BYTEARRAY"), - zkUrl = Some(quorumAddress) - ) + val schema = StructType( + Seq(StructField("ID", LongType, nullable = false), + StructField("BYTEARRAY", ArrayType(ByteType)))) + + val rowRDD = spark.sparkContext.parallelize(dataSet) + + // Apply the schema to the RDD. + val df = spark.sqlContext.createDataFrame(rowRDD, schema) + + df.write + .format("phoenix") + .options(Map("table" -> "ARRAY_BYTE_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)) + .mode(SaveMode.Overwrite) + .save() // Load the results back val stmt = conn.createStatement() @@ -549,19 +587,28 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { val byteArray = rs.getArray(1).getArray().asInstanceOf[Array[Byte]] // Verify the arrays are equal - byteArray shouldEqual dataSet(0)._2 + byteArray shouldEqual dataSet(0).get(1) } test("Can save binary types back to phoenix") { - val dataSet = List((2L, Array[Byte](1), Array[Byte](1, 2, 3), Array[Array[Byte]](Array[Byte](1), Array[Byte](2)))) + val dataSet = List(Row(2L, Array[Byte](1), Array[Byte](1, 2, 3), Array[Array[Byte]](Array[Byte](1), Array[Byte](2)))) + + val schema = StructType( + Seq(StructField("ID", LongType, false), + StructField("BIN", BinaryType), + StructField("VARBIN", BinaryType), + StructField("BINARRAY", ArrayType(BinaryType)))) + + val rowRDD = spark.sparkContext.parallelize(dataSet) + + // Apply the schema to the RDD. + val df = spark.sqlContext.createDataFrame(rowRDD, schema) - sc - .parallelize(dataSet) - .saveToPhoenix( - "VARBINARY_TEST_TABLE", - Seq("ID", "BIN", "VARBIN", "BINARRAY"), - zkUrl = Some(quorumAddress) - ) + df.write + .format("phoenix") + .options(Map("table" -> "VARBINARY_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)) + .mode(SaveMode.Overwrite) + .save() // Load the results back val stmt = conn.createStatement() @@ -572,16 +619,15 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { val varByteArray = rs.getArray("BINARRAY").getArray().asInstanceOf[Array[Array[Byte]]] // Verify the arrays are equal - byte shouldEqual dataSet(0)._2 - varByte shouldEqual dataSet(0)._3 - varByteArray shouldEqual dataSet(0)._4 + byte shouldEqual dataSet(0).get(1) + varByte shouldEqual dataSet(0).get(2) + varByteArray shouldEqual dataSet(0).get(3) } test("Can load Phoenix DATE columns through DataFrame API") { - val sqlContext = new SQLContext(sc) - val df = sqlContext.read - .format("org.apache.phoenix.spark") - .options(Map("table" -> "DATE_TEST", "zkUrl" -> quorumAddress)) + val df = spark.sqlContext.read + .format("phoenix") + .options(Map("table" -> "DATE_TEST", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)) .load val dt = df.select("COL1").first().getDate(0).getTime val epoch = new Date().getTime @@ -595,37 +641,37 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { } test("Filter operation doesn't work for column names containing a white space (PHOENIX-2547)") { - val sqlContext = new SQLContext(sc) - val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> SchemaUtil.getEscapedArgument("space"), - "zkUrl" -> quorumAddress)) + val df = spark.sqlContext.read.format("phoenix") + .options(Map("table" -> SchemaUtil.getEscapedArgument("space"), PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)) + .load val res = df.filter(df.col("first name").equalTo("xyz")) // Make sure we got the right value back assert(res.collectAsList().size() == 1L) } test("Spark Phoenix cannot recognize Phoenix view fields (PHOENIX-2290)") { - val sqlContext = new SQLContext(sc) - val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> SchemaUtil.getEscapedArgument("small"), - "zkUrl" -> quorumAddress)) - df.registerTempTable("temp") + val df = spark.sqlContext.read.format("phoenix") + .options(Map("table" -> SchemaUtil.getEscapedArgument("small"), PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)) + .load + df.createOrReplaceTempView("temp") // limitation: filter / where expressions are not allowed with "double quotes", instead of that pass it as column expressions // reason: if the expression contains "double quotes" then spark sql parser, ignoring evaluating .. giving to next level to handle - val res1 = sqlContext.sql("select * from temp where salary = '10000' ") + val res1 = spark.sql("select * from temp where salary = '10000' ") assert(res1.collectAsList().size() == 1L) - val res2 = sqlContext.sql("select * from temp where \"salary\" = '10000' ") + val res2 = spark.sql("select * from temp where \"salary\" = '10000' ") assert(res2.collectAsList().size() == 0L) - val res3 = sqlContext.sql("select * from temp where salary > '10000' ") + val res3 = spark.sql("select * from temp where salary > '10000' ") assert(res3.collectAsList().size() == 2L) } test("Queries with small case column-names return empty result-set when working with Spark Datasource Plugin (PHOENIX-2336)") { - val sqlContext = new SQLContext(sc) - val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> SchemaUtil.getEscapedArgument("small"), - "zkUrl" -> quorumAddress)) + val df = spark.sqlContext.read.format("phoenix") + .options(Map("table" -> SchemaUtil.getEscapedArgument("small"), PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)) + .load // limitation: filter / where expressions are not allowed with "double quotes", instead of that pass it as column expressions // reason: if the expression contains "double quotes" then spark sql parser, ignoring evaluating .. giving to next level to handle @@ -644,10 +690,9 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { } test("Can coerce Phoenix DATE columns to TIMESTAMP through DataFrame API") { - val sqlContext = new SQLContext(sc) - val df = sqlContext.read - .format("org.apache.phoenix.spark") - .options(Map("table" -> "DATE_TEST", "zkUrl" -> quorumAddress, "dateAsTimestamp" -> "true")) + val df = spark.sqlContext.read + .format("phoenix") + .options(Map("table" -> "DATE_TEST", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress, "dateAsTimestamp" -> "true")) .load val dtRes = df.select("COL1").first() val ts = dtRes.getTimestamp(0).getTime @@ -657,10 +702,9 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { } test("Can load Phoenix Time columns through DataFrame API") { - val sqlContext = new SQLContext(sc) - val df = sqlContext.read - .format("org.apache.phoenix.spark") - .options(Map("table" -> "TIME_TEST", "zkUrl" -> quorumAddress)) + val df = spark.sqlContext.read + .format("phoenix") + .options(Map("table" -> "TIME_TEST", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)) .load val time = df.select("COL1").first().getTimestamp(0).getTime val epoch = new Date().getTime @@ -668,13 +712,14 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { } test("can read all Phoenix data types") { - val sqlContext = new SQLContext(sc) - val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "GIGANTIC_TABLE", - "zkUrl" -> quorumAddress)) + val df = spark.sqlContext.read + .format("phoenix") + .options(Map("table" -> "GIGANTIC_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)) + .load df.write - .format("org.apache.phoenix.spark") - .options(Map("table" -> "OUTPUT_GIGANTIC_TABLE", "zkUrl" -> quorumAddress)) + .format("phoenix") + .options(Map("table" -> "OUTPUT_GIGANTIC_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)) .mode(SaveMode.Overwrite) .save() diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkITTenantSpecific.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkITTenantSpecific.scala index 77b41af..291ea2a 100644 --- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkITTenantSpecific.scala +++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkITTenantSpecific.scala @@ -64,8 +64,7 @@ class PhoenixSparkITTenantSpecific extends AbstractPhoenixSparkIT { /*****************/ test("Can read from tenant-specific table as DataFrame") { - val sqlContext = new SQLContext(sc) - val df = sqlContext.phoenixTableAsDataFrame( + val df = spark.sqlContext.phoenixTableAsDataFrame( TenantTable, Seq(OrgIdCol, TenantOnlyCol), zkUrl = Some(quorumAddress), @@ -78,7 +77,7 @@ class PhoenixSparkITTenantSpecific extends AbstractPhoenixSparkIT { } test("Can read from tenant-specific table as RDD") { - val rdd = sc.phoenixTableAsRDD( + val rdd = spark.sparkContext.phoenixTableAsRDD( TenantTable, Seq(OrgIdCol, TenantOnlyCol), zkUrl = Some(quorumAddress), @@ -95,23 +94,23 @@ class PhoenixSparkITTenantSpecific extends AbstractPhoenixSparkIT { /*****************/ test("Can write a DataFrame using 'DataFrame.saveToPhoenix' to tenant-specific view") { - val sqlContext = new SQLContext(sc) + val sqlContext = spark.sqlContext import sqlContext.implicits._ - val df = sc.parallelize(TestDataSet).toDF(OrgIdCol, TenantOnlyCol) + val df = spark.sparkContext.parallelize(TestDataSet).toDF(OrgIdCol, TenantOnlyCol) df.saveToPhoenix(TenantTable, zkUrl = Some(quorumAddress), tenantId = Some(TenantId)) verifyResults } test("Can write a DataFrame using 'DataFrame.write' to tenant-specific view") { - val sqlContext = new SQLContext(sc) + val sqlContext = spark.sqlContext import sqlContext.implicits._ - val df = sc.parallelize(TestDataSet).toDF(OrgIdCol, TenantOnlyCol) + val df = spark.sparkContext.parallelize(TestDataSet).toDF(OrgIdCol, TenantOnlyCol) df.write - .format("org.apache.phoenix.spark") + .format("phoenix") .mode("overwrite") .option("table", TenantTable) .option(PhoenixRuntime.TENANT_ID_ATTRIB, TenantId) @@ -122,8 +121,7 @@ class PhoenixSparkITTenantSpecific extends AbstractPhoenixSparkIT { } test("Can write an RDD to Phoenix tenant-specific view") { - val sqlContext = new SQLContext(sc) - sc + spark.sparkContext .parallelize(TestDataSet) .saveToPhoenix( TenantTable, diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java new file mode 100644 index 0000000..ad79d1c --- /dev/null +++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.spark.datasource.v2; + +import java.util.Optional; + +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.spark.datasource.v2.reader.PhoenixDataSourceReader; +import org.apache.phoenix.spark.datasource.v2.writer.PhoenixDataSourceWriteOptions; +import org.apache.phoenix.spark.datasource.v2.writer.PhoenixDatasourceWriter; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.sources.DataSourceRegister; +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.ReadSupport; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.sources.v2.reader.DataSourceReader; +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; +import org.apache.spark.sql.types.StructType; + +/** + * Implements the DataSourceV2 api to read and write from Phoenix tables + */ +public class PhoenixDataSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister { + + public static final String SKIP_NORMALIZING_IDENTIFIER = "skipNormalizingIdentifier"; + public static final String ZOOKEEPER_URL = "zkUrl"; + + @Override + public DataSourceReader createReader(DataSourceOptions options) { + return new PhoenixDataSourceReader(options); + } + + @Override + public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema, SaveMode mode, + DataSourceOptions options) { + if (!mode.equals(SaveMode.Overwrite)) { + throw new RuntimeException("SaveMode other than SaveMode.OverWrite is not supported"); + } + if (!options.tableName().isPresent()) { + throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined"); + } + if (!options.get(PhoenixDataSource.ZOOKEEPER_URL).isPresent()) { + throw new RuntimeException("No Phoenix option " + PhoenixDataSource.ZOOKEEPER_URL + " defined"); + } + + PhoenixDataSourceWriteOptions writeOptions = createPhoenixDataSourceWriteOptions(options, schema); + return Optional.of(new PhoenixDatasourceWriter(writeOptions)); + } + + private PhoenixDataSourceWriteOptions createPhoenixDataSourceWriteOptions(DataSourceOptions options, + StructType schema) { + String scn = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE).orElse(null); + String tenantId = options.get(PhoenixRuntime.TENANT_ID_ATTRIB).orElse(null); + String zkUrl = options.get(ZOOKEEPER_URL).get(); + boolean skipNormalizingIdentifier = options.getBoolean(SKIP_NORMALIZING_IDENTIFIER, false); + return new PhoenixDataSourceWriteOptions.Builder().setTableName(options.tableName().get()) + .setZkUrl(zkUrl).setScn(scn).setTenantId(tenantId).setSchema(schema) + .setSkipNormalizingIdentifier(skipNormalizingIdentifier).build(); + } + + @Override + public String shortName() { + return "phoenix"; + } +} diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java new file mode 100644 index 0000000..8c2fdb1 --- /dev/null +++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.spark.datasource.v2.reader; + +import java.io.Serializable; + +public class PhoenixDataSourceReadOptions implements Serializable { + + private final String tenantId; + private final String zkUrl; + private final String scn; + private final String selectStatement; + + public PhoenixDataSourceReadOptions(String zkUrl, String scn, String tenantId, String selectStatement) { + this.zkUrl = zkUrl; + this.scn = scn; + this.tenantId = tenantId; + this.selectStatement = selectStatement; + } + + public String getSelectStatement() { + return selectStatement; + } + + public String getScn() { + return scn; + } + + public String getZkUrl() { + return zkUrl; + } + + public String getTenantId() { + return tenantId; + } +} diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java new file mode 100644 index 0000000..446d96f --- /dev/null +++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.spark.datasource.v2.reader; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.RegionSizeCalculator; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.iterate.MapReduceParallelScanGrouper; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.mapreduce.PhoenixInputSplit; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.spark.FilterExpressionCompiler; +import org.apache.phoenix.spark.SparkSchemaUtil; +import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource; +import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.sources.Filter; +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.apache.spark.sql.sources.v2.reader.DataSourceReader; +import org.apache.spark.sql.sources.v2.reader.InputPartition; +import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters; +import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns; +import org.apache.spark.sql.types.StructType; +import scala.Tuple3; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +public class PhoenixDataSourceReader implements DataSourceReader, SupportsPushDownFilters, + SupportsPushDownRequiredColumns { + + private final DataSourceOptions options; + private final String tableName; + private final String zkUrl; + private final boolean dateAsTimestamp; + + private StructType schema; + private Filter[] pushedFilters = new Filter[]{}; + // derived from pushedFilters + private String whereClause; + + public PhoenixDataSourceReader(DataSourceOptions options) { + if (!options.tableName().isPresent()) { + throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined"); + } + if (!options.get(PhoenixDataSource.ZOOKEEPER_URL).isPresent()) { + throw new RuntimeException("No Phoenix option " + PhoenixDataSource.ZOOKEEPER_URL + " defined"); + } + this.options = options; + this.tableName = options.tableName().get(); + this.zkUrl = options.get("zkUrl").get(); + this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false); + setSchema(); + } + + /** + * Sets the schema using all the table columns before any column pruning has been done + */ + private void setSchema() { + try (Connection conn = DriverManager.getConnection("jdbc:phoenix:" + zkUrl)) { + List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, null); + Seq<ColumnInfo> columnInfoSeq = JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq(); + schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp); + } + catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public StructType readSchema() { + return schema; + } + + @Override + public Filter[] pushFilters(Filter[] filters) { + Tuple3<String, Filter[], Filter[]> tuple3 = new FilterExpressionCompiler().pushFilters(filters); + whereClause = tuple3._1(); + pushedFilters = tuple3._3(); + return tuple3._2(); + } + + @Override + public List<InputPartition<InternalRow>> planInputPartitions() { + Optional<String> currentScnValue = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE); + Optional<String> tenantId = options.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID); + // Generate splits based off statistics, or just region splits? + boolean splitByStats = options.getBoolean( + PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS); + Properties overridingProps = new Properties(); + if(currentScnValue.isPresent()) { + overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue.get()); + } + if (tenantId.isPresent()){ + overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId.get()); + } + try (Connection conn = DriverManager.getConnection("jdbc:phoenix:" + zkUrl, overridingProps)) { + List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, Lists.newArrayList(schema.names())); + final Statement statement = conn.createStatement(); + final String selectStatement = QueryUtil.constructSelectStatement(tableName, columnInfos, whereClause); + Preconditions.checkNotNull(selectStatement); + + final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class); + // Optimize the query plan so that we potentially use secondary indexes + final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement); + final Scan scan = queryPlan.getContext().getScan(); + + // setting the snapshot configuration + Optional<String> snapshotName = options.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY); + if (snapshotName.isPresent()) + PhoenixConfigurationUtil.setSnapshotNameKey(queryPlan.getContext().getConnection(). + getQueryServices().getConfiguration(), snapshotName.get()); + + // Initialize the query plan so it sets up the parallel scans + queryPlan.iterator(MapReduceParallelScanGrouper.getInstance()); + + List<KeyRange> allSplits = queryPlan.getSplits(); + // Get the RegionSizeCalculator + PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class); + org.apache.hadoop.hbase.client.Connection connection = + phxConn.getQueryServices().getAdmin().getConnection(); + RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(queryPlan + .getTableRef().getTable().getPhysicalName().toString())); + RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, connection + .getAdmin()); + + final List<InputPartition<InternalRow>> partitions = Lists.newArrayListWithExpectedSize(allSplits.size()); + for (List<Scan> scans : queryPlan.getScans()) { + // Get the region location + HRegionLocation location = regionLocator.getRegionLocation( + scans.get(0).getStartRow(), + false + ); + + String regionLocation = location.getHostname(); + + // Get the region size + long regionSize = sizeCalculator.getRegionSize( + location.getRegionInfo().getRegionName() + ); + + PhoenixDataSourceReadOptions phoenixDataSourceOptions = new PhoenixDataSourceReadOptions(zkUrl, + currentScnValue.orElse(null), tenantId.orElse(null), selectStatement); + if (splitByStats) { + for (Scan aScan : scans) { + partitions.add(new PhoenixInputPartition(phoenixDataSourceOptions, schema, + new PhoenixInputSplit(Collections.singletonList(aScan), regionSize, regionLocation))); + } + } else { + partitions.add(new PhoenixInputPartition(phoenixDataSourceOptions, schema, + new PhoenixInputSplit(scans, regionSize, regionLocation))); + } + } + return partitions; + } catch (Exception e) { + throw new RuntimeException("Unable to plan query", e); + } + } + + @Override + public Filter[] pushedFilters() { + return pushedFilters; + } + + @Override + public void pruneColumns(StructType schema) { + this.schema = schema; + } +} diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java new file mode 100644 index 0000000..624ff0f --- /dev/null +++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.spark.datasource.v2.reader; + +import org.apache.phoenix.mapreduce.PhoenixInputSplit; +import org.apache.spark.SerializableWritable; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.sources.v2.reader.InputPartition; +import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; +import org.apache.spark.sql.types.StructType; + +public class PhoenixInputPartition implements InputPartition<InternalRow> { + + private SerializableWritable<PhoenixInputSplit> phoenixInputSplit; + private StructType schema; + private PhoenixDataSourceReadOptions options; + + public PhoenixInputPartition(PhoenixDataSourceReadOptions options, StructType schema, PhoenixInputSplit phoenixInputSplit) { + this.phoenixInputSplit = new SerializableWritable<>(phoenixInputSplit); + this.schema = schema; + this.options = options; + } + + @Override + public InputPartitionReader<InternalRow> createPartitionReader() { + return new PhoenixInputPartitionReader(options, schema, phoenixInputSplit); + } + +} diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java new file mode 100644 index 0000000..30e84db --- /dev/null +++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.spark.datasource.v2.reader; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.iterate.ConcatResultIterator; +import org.apache.phoenix.iterate.LookAheadResultIterator; +import org.apache.phoenix.iterate.MapReduceParallelScanGrouper; +import org.apache.phoenix.iterate.PeekingResultIterator; +import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.iterate.RoundRobinResultIterator; +import org.apache.phoenix.iterate.SequenceResultIterator; +import org.apache.phoenix.iterate.TableResultIterator; +import org.apache.phoenix.jdbc.PhoenixResultSet; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.mapreduce.PhoenixInputSplit; +import org.apache.phoenix.monitoring.ReadMetricQueue; +import org.apache.phoenix.monitoring.ScanMetricsHolder; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.spark.SerializableWritable; +import org.apache.spark.executor.InputMetrics; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.datasources.SparkJdbcUtil; +import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; +import org.apache.spark.sql.types.StructType; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import scala.collection.Iterator; + +public class PhoenixInputPartitionReader implements InputPartitionReader<InternalRow> { + + private SerializableWritable<PhoenixInputSplit> phoenixInputSplit; + private StructType schema; + private Iterator<InternalRow> iterator; + private PhoenixResultSet resultSet; + private InternalRow currentRow; + private PhoenixDataSourceReadOptions options; + + public PhoenixInputPartitionReader(PhoenixDataSourceReadOptions options, StructType schema, SerializableWritable<PhoenixInputSplit> phoenixInputSplit) { + this.options = options; + this.phoenixInputSplit = phoenixInputSplit; + this.schema = schema; + initialize(); + } + + private QueryPlan getQueryPlan() throws SQLException { + String scn = options.getScn(); + String tenantId = options.getTenantId(); + String zkUrl = options.getZkUrl(); + Properties overridingProps = new Properties(); + if (scn != null) { + overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn); + } + if (tenantId != null) { + overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + } + try (Connection conn = DriverManager.getConnection("jdbc:phoenix:" + zkUrl, overridingProps)) { + final Statement statement = conn.createStatement(); + final String selectStatement = options.getSelectStatement(); + Preconditions.checkNotNull(selectStatement); + + final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class); + // Optimize the query plan so that we potentially use secondary indexes + final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement); + return queryPlan; + } + } + + private void initialize() { + try { + final QueryPlan queryPlan = getQueryPlan(); + final List<Scan> scans = phoenixInputSplit.value().getScans(); + List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size()); + StatementContext ctx = queryPlan.getContext(); + ReadMetricQueue readMetrics = ctx.getReadMetricsQueue(); + String tableName = queryPlan.getTableRef().getTable().getPhysicalName().getString(); + + // Clear the table region boundary cache to make sure long running jobs stay up to date + byte[] tableNameBytes = queryPlan.getTableRef().getTable().getPhysicalName().getBytes(); + ConnectionQueryServices services = queryPlan.getContext().getConnection().getQueryServices(); + services.clearTableRegionCache(tableNameBytes); + + long renewScannerLeaseThreshold = queryPlan.getContext().getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds(); + for (Scan scan : scans) { + // For MR, skip the region boundary check exception if we encounter a split. ref: PHOENIX-2599 + scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true)); + + PeekingResultIterator peekingResultIterator; + ScanMetricsHolder scanMetricsHolder = + ScanMetricsHolder.getInstance(readMetrics, tableName, scan, + queryPlan.getContext().getConnection().getLogLevel()); + final TableResultIterator tableResultIterator = + new TableResultIterator( + queryPlan.getContext().getConnection().getMutationState(), scan, + scanMetricsHolder, renewScannerLeaseThreshold, queryPlan, + MapReduceParallelScanGrouper.getInstance()); + peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator); + iterators.add(peekingResultIterator); + } + ResultIterator iterator = queryPlan.useRoundRobinIterator() ? RoundRobinResultIterator.newIterator(iterators, queryPlan) : ConcatResultIterator.newIterator(iterators); + if (queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) { + iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager()); + } + // Clone the row projector as it's not thread safe and would be used simultaneously by + // multiple threads otherwise. + this.resultSet = new PhoenixResultSet(iterator, queryPlan.getProjector().cloneIfNecessary(), queryPlan.getContext()); + this.iterator = SparkJdbcUtil.resultSetToSparkInternalRows(resultSet, schema, new InputMetrics()); + } + catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean next() { + if (!iterator.hasNext()) { + return false; + } + currentRow = iterator.next(); + return true; + } + + @Override + public InternalRow get() { + return currentRow; + } + + @Override + public void close() throws IOException { + if(resultSet != null) { + try { + resultSet.close(); + } catch (SQLException e) { + throw new IOException(e); + } + } + } +} diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java new file mode 100644 index 0000000..781d1c8 --- /dev/null +++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.spark.datasource.v2.writer; + +import org.apache.spark.sql.types.StructType; + +import java.io.Serializable; + +public class PhoenixDataSourceWriteOptions implements Serializable { + + private final String tableName; + private final String zkUrl; + private final String tenantId; + private final String scn; + private final StructType schema; + private final boolean skipNormalizingIdentifier; + + private PhoenixDataSourceWriteOptions(String tableName, String zkUrl, String scn, String tenantId, + StructType schema, boolean skipNormalizingIdentifier) { + this.tableName = tableName; + this.zkUrl = zkUrl; + this.scn = scn; + this.tenantId = tenantId; + this.schema = schema; + this.skipNormalizingIdentifier = skipNormalizingIdentifier; + } + + public String getScn() { + return scn; + } + + public String getZkUrl() { + return zkUrl; + } + + public String getTenantId() { + return tenantId; + } + + public StructType getSchema() { + return schema; + } + + public String getTableName() { + return tableName; + } + + public boolean skipNormalizingIdentifier() { + return skipNormalizingIdentifier; + } + + public static class Builder { + private String tableName; + private String zkUrl; + private String scn; + private String tenantId; + private StructType schema; + private boolean skipNormalizingIdentifier; + + public Builder setTableName(String tableName) { + this.tableName = tableName; + return this; + } + + public Builder setZkUrl(String zkUrl) { + this.zkUrl = zkUrl; + return this; + } + + public Builder setScn(String scn) { + this.scn = scn; + return this; + } + + public Builder setTenantId(String tenantId) { + this.tenantId = tenantId; + return this; + } + + public Builder setSchema(StructType schema) { + this.schema = schema; + return this; + } + + public Builder setSkipNormalizingIdentifier(boolean skipNormalizingIdentifier) { + this.skipNormalizingIdentifier = skipNormalizingIdentifier; + return this; + } + + public PhoenixDataSourceWriteOptions build() { + return new PhoenixDataSourceWriteOptions(tableName, zkUrl, scn, tenantId, schema, skipNormalizingIdentifier); + } + } +} \ No newline at end of file diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java new file mode 100644 index 0000000..cf42aa5 --- /dev/null +++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java @@ -0,0 +1,100 @@ +package org.apache.phoenix.spark.datasource.v2.writer; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.List; +import java.util.Properties; +import java.util.stream.Collectors; + +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.datasources.SparkJdbcUtil; +import org.apache.spark.sql.execution.datasources.jdbc.PhoenixJdbcDialect$; +import org.apache.spark.sql.sources.v2.writer.DataWriter; +import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +import com.google.common.collect.Lists; + +public class PhoenixDataWriter implements DataWriter<InternalRow> { + + private final StructType schema; + private final Connection conn; + private final PreparedStatement statement; + + public PhoenixDataWriter(PhoenixDataSourceWriteOptions options) { + String scn = options.getScn(); + String tenantId = options.getTenantId(); + String zkUrl = options.getZkUrl(); + Properties overridingProps = new Properties(); + if (scn != null) { + overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn); + } + if (tenantId != null) { + overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + } + this.schema = options.getSchema(); + try { + this.conn = DriverManager.getConnection("jdbc:phoenix:" + zkUrl, overridingProps); + List<String> colNames = Lists.newArrayList(options.getSchema().names()); + if (!options.skipNormalizingIdentifier()){ + colNames = colNames.stream().map(colName -> SchemaUtil.normalizeIdentifier(colName)).collect(Collectors.toList()); + } + String upsertSql = QueryUtil.constructUpsertStatement(options.getTableName(), colNames, null); + this.statement = this.conn.prepareStatement(upsertSql); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public void write(InternalRow internalRow) throws IOException { + try { + int i=0; + for (StructField field : schema.fields()) { + DataType dataType = field.dataType(); + if (internalRow.isNullAt(i)) { + statement.setNull(i + 1, SparkJdbcUtil.getJdbcType(dataType, + PhoenixJdbcDialect$.MODULE$).jdbcNullType()); + } else { + Row row = SparkJdbcUtil.toRow(schema, internalRow); + SparkJdbcUtil.makeSetter(conn, PhoenixJdbcDialect$.MODULE$, dataType).apply(statement, row, i); + } + ++i; + } + statement.execute(); + } catch (SQLException e) { + throw new IOException("Exception while executing Phoenix prepared statement", e); + } + } + + @Override + public WriterCommitMessage commit() throws IOException { + try { + conn.commit(); + } catch (SQLException e) { + throw new RuntimeException(e); + } finally { + try { + statement.close(); + conn.close(); + } + catch (SQLException ex) { + throw new RuntimeException(ex); + } + } + return null; + } + + @Override + public void abort() throws IOException { + } +} diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java new file mode 100644 index 0000000..751fdfa --- /dev/null +++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java @@ -0,0 +1,19 @@ +package org.apache.phoenix.spark.datasource.v2.writer; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.sources.v2.writer.DataWriter; +import org.apache.spark.sql.sources.v2.writer.DataWriterFactory; + +public class PhoenixDataWriterFactory implements DataWriterFactory<InternalRow> { + + private final PhoenixDataSourceWriteOptions options; + + public PhoenixDataWriterFactory(PhoenixDataSourceWriteOptions options) { + this.options = options; + } + + @Override + public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) { + return new PhoenixDataWriter(options); + } +} diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java new file mode 100644 index 0000000..7847609 --- /dev/null +++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java @@ -0,0 +1,34 @@ +package org.apache.phoenix.spark.datasource.v2.writer; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; +import org.apache.spark.sql.sources.v2.writer.DataWriterFactory; +import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; + +public class PhoenixDatasourceWriter implements DataSourceWriter { + + private final PhoenixDataSourceWriteOptions options; + + public PhoenixDatasourceWriter(PhoenixDataSourceWriteOptions options) { + this.options = options; + } + + @Override + public DataWriterFactory<InternalRow> createWriterFactory() { + return new PhoenixDataWriterFactory(options); + } + + @Override + public boolean useCommitCoordinator() { + return false; + } + + @Override + public void commit(WriterCommitMessage[] messages) { + } + + @Override + public void abort(WriterCommitMessage[] messages) { + } +} diff --git a/phoenix-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/phoenix-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister new file mode 100644 index 0000000..6eff1af --- /dev/null +++ b/phoenix-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -0,0 +1 @@ +org.apache.phoenix.spark.datasource.v2.PhoenixDataSource \ No newline at end of file diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala index ca476e7..d555954 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala @@ -21,6 +21,7 @@ import org.apache.phoenix.util.{ColumnInfo, PhoenixRuntime} import scala.collection.JavaConversions._ +@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)") object ConfigurationUtil extends Serializable { def getOutputConfiguration(tableName: String, columns: Seq[String], zkUrl: Option[String], tenantId: Option[String] = None, conf: Option[Configuration] = None): Configuration = { diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala index be4a32b..3b0289d 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.DataFrame import scala.collection.JavaConversions._ - +@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)") class DataFrameFunctions(data: DataFrame) extends Serializable { def saveToPhoenix(parameters: Map[String, String]): Unit = { saveToPhoenix(parameters("table"), zkUrl = parameters.get("zkUrl"), tenantId = parameters.get("TenantId"), diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala index e000b74..ccdf595 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala @@ -20,6 +20,7 @@ package org.apache.phoenix.spark import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider} import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} +@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)") class DefaultSource extends RelationProvider with CreatableRelationProvider { // Override 'RelationProvider.createRelation', this enables DataFrame.load() diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala similarity index 60% copy from phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala copy to phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala index 38bf29a..74ff67e 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala @@ -17,55 +17,27 @@ */ package org.apache.phoenix.spark -import org.apache.hadoop.conf.Configuration -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{Row, SQLContext} -import org.apache.spark.sql.sources._ +import java.sql.Timestamp +import java.text.Format + +import org.apache.phoenix.util.{DateUtil, SchemaUtil} import org.apache.phoenix.util.StringUtil.escapeStringConstant -import org.apache.phoenix.util.SchemaUtil - -case class PhoenixRelation(tableName: String, zkUrl: String, dateAsTimestamp: Boolean = false)(@transient val sqlContext: SQLContext) - extends BaseRelation with PrunedFilteredScan { - - /* - This is the buildScan() implementing Spark's PrunedFilteredScan. - Spark SQL queries with columns or predicates specified will be pushed down - to us here, and we can pass that on to Phoenix. According to the docs, this - is an optimization, and the filtering/pruning will be re-evaluated again, - but this prevents having to load the whole table into Spark first. - */ - override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { - val(pushedFilters, unhandledFilters) = buildFilter(filters) - new PhoenixRDD( - sqlContext.sparkContext, - tableName, - requiredColumns, - Some(pushedFilters), - Some(zkUrl), - new Configuration(), - dateAsTimestamp - ).toDataFrame(sqlContext).rdd - } +import org.apache.spark.sql.sources._ - // Required by BaseRelation, this will return the full schema for a table - override def schema: StructType = { - new PhoenixRDD( - sqlContext.sparkContext, - tableName, - Seq(), - None, - Some(zkUrl), - new Configuration(), - dateAsTimestamp - ).toDataFrame(sqlContext).schema - } +class FilterExpressionCompiler() { + + val timeformatter:Format = DateUtil.getTimestampFormatter(DateUtil.DEFAULT_TIME_FORMAT, DateUtil.DEFAULT_TIME_ZONE_ID) - // Attempt to create Phoenix-accepted WHERE clauses from Spark filters, - // mostly inspired from Spark SQL JDBCRDD and the couchbase-spark-connector - private def buildFilter(filters: Array[Filter]): (String, Array[Filter]) = { + /** + * Attempt to create Phoenix-accepted WHERE clause from Spark filters, + * mostly inspired from Spark SQL JDBCRDD and the couchbase-spark-connector + * + * @return tuple representing where clause (derived from supported filters), + * array of unsupported filters and array of supported filters + */ + def pushFilters(filters: Array[Filter]): (String, Array[Filter], Array[Filter]) = { if (filters.isEmpty) { - return ("" , Array[Filter]()) + return ("" , Array[Filter](), Array[Filter]()) } val filter = new StringBuilder("") @@ -80,9 +52,30 @@ case class PhoenixRelation(tableName: String, zkUrl: String, dateAsTimestamp: Bo f match { // Spark 1.3.1+ supported filters - case And(leftFilter, rightFilter) => filter.append(buildFilter(Array(leftFilter, rightFilter))) - case Or(leftFilter, rightFilter) => filter.append(buildFilter(Array(leftFilter)) + " OR " + buildFilter(Array(rightFilter))) - case Not(aFilter) => filter.append(" NOT " + buildFilter(Array(aFilter))) + case And(leftFilter, rightFilter) => { + val(whereClause, currUnsupportedFilters, _) = pushFilters(Array(leftFilter, rightFilter)) + if (currUnsupportedFilters.isEmpty) + filter.append(whereClause) + else + unsupportedFilters :+ f + } + case Or(leftFilter, rightFilter) => { + val(whereLeftClause, leftUnsupportedFilters, _) = pushFilters(Array(leftFilter)) + val(whereRightClause, rightUnsupportedFilters, _) = pushFilters(Array(rightFilter)) + if (leftUnsupportedFilters.isEmpty && rightUnsupportedFilters.isEmpty) { + filter.append(whereLeftClause + " OR " + whereRightClause) + } + else { + unsupportedFilters :+ f + } + } + case Not(aFilter) => { + val(whereClause, currUnsupportedFilters, _) = pushFilters(Array(aFilter)) + if (currUnsupportedFilters.isEmpty) + filter.append(" NOT " + whereClause) + else + unsupportedFilters :+ f + } case EqualTo(attr, value) => filter.append(s" ${escapeKey(attr)} = ${compileValue(value)}") case GreaterThan(attr, value) => filter.append(s" ${escapeKey(attr)} > ${compileValue(value)}") case GreaterThanOrEqual(attr, value) => filter.append(s" ${escapeKey(attr)} >= ${compileValue(value)}") @@ -100,21 +93,15 @@ case class PhoenixRelation(tableName: String, zkUrl: String, dateAsTimestamp: Bo i = i + 1 }) - (filter.toString(), unsupportedFilters) - } - - override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { - val(pushedFilters, unhandledFilters) = buildFilter(filters) - unhandledFilters + (filter.toString(), unsupportedFilters, filters diff unsupportedFilters) } - // Helper function to escape column key to work with SQL queries - private def escapeKey(key: String): String = SchemaUtil.getEscapedArgument(key) - // Helper function to escape string values in SQL queries private def compileValue(value: Any): Any = value match { case stringValue: String => s"'${escapeStringConstant(stringValue)}'" + case timestampValue: Timestamp => getTimestampString(timestampValue) + // Borrowed from 'elasticsearch-hadoop', support these internal UTF types across Spark versions // Spark 1.4 case utf if (isClass(utf, "org.apache.spark.sql.types.UTF8String")) => s"'${escapeStringConstant(utf.toString)}'" @@ -125,6 +112,14 @@ case class PhoenixRelation(tableName: String, zkUrl: String, dateAsTimestamp: Bo case _ => value } + private def getTimestampString(timestampValue: Timestamp): String = { + "TO_TIMESTAMP('%s', '%s', '%s')".format(timeformatter.format(timestampValue), + DateUtil.DEFAULT_TIME_FORMAT, DateUtil.DEFAULT_TIME_ZONE_ID) + } + + // Helper function to escape column key to work with SQL queries + private def escapeKey(key: String): String = SchemaUtil.getEscapedFullColumnName(key) + private def isClass(obj: Any, className: String) = { className.equals(obj.getClass().getName()) } diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala index d604e0e..7331a5f 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala @@ -21,9 +21,6 @@ import org.apache.hadoop.io.NullWritable import org.apache.phoenix.jdbc.PhoenixDriver import org.apache.phoenix.mapreduce.PhoenixInputFormat import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil -import org.apache.phoenix.query.QueryConstants -import org.apache.phoenix.schema.types._ -import org.apache.phoenix.util.{ColumnInfo, SchemaUtil} import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD @@ -32,6 +29,7 @@ import org.apache.spark.sql.{DataFrame, Row, SQLContext} import scala.collection.JavaConverters._ +@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)") class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String], predicate: Option[String] = None, zkUrl: Option[String] = None, @@ -126,7 +124,7 @@ class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String], // Lookup the Spark catalyst types from the Phoenix schema - val structFields = phoenixSchemaToCatalystSchema(columnInfoList).toArray + val structType = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoList, dateAsTimestamp) // Create the data frame from the converted Spark schema sqlContext.createDataFrame(map(pr => { @@ -146,60 +144,7 @@ class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String], // Create a Spark Row from the sequence Row.fromSeq(rowSeq) - }), new StructType(structFields)) + }), structType) } - def normalizeColumnName(columnName: String) = { - val unescapedColumnName = SchemaUtil.getUnEscapedFullColumnName(columnName) - var normalizedColumnName = "" - if (unescapedColumnName.indexOf(QueryConstants.NAME_SEPARATOR) < 0) { - normalizedColumnName = unescapedColumnName - } - else { - // split by separator to get the column family and column name - val tokens = unescapedColumnName.split(QueryConstants.NAME_SEPARATOR_REGEX) - normalizedColumnName = if (tokens(0) == "0") tokens(1) else unescapedColumnName - } - normalizedColumnName - } - - def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo]) = columnList.map(ci => { - val structType = phoenixTypeToCatalystType(ci) - StructField(normalizeColumnName(ci.getColumnName), structType) - }) - - - // Lookup table for Phoenix types to Spark catalyst types - def phoenixTypeToCatalystType(columnInfo: ColumnInfo): DataType = columnInfo.getPDataType match { - case t if t.isInstanceOf[PVarchar] || t.isInstanceOf[PChar] => StringType - case t if t.isInstanceOf[PLong] || t.isInstanceOf[PUnsignedLong] => LongType - case t if t.isInstanceOf[PInteger] || t.isInstanceOf[PUnsignedInt] => IntegerType - case t if t.isInstanceOf[PSmallint] || t.isInstanceOf[PUnsignedSmallint] => ShortType - case t if t.isInstanceOf[PTinyint] || t.isInstanceOf[PUnsignedTinyint] => ByteType - case t if t.isInstanceOf[PFloat] || t.isInstanceOf[PUnsignedFloat] => FloatType - case t if t.isInstanceOf[PDouble] || t.isInstanceOf[PUnsignedDouble] => DoubleType - // Use Spark system default precision for now (explicit to work with < 1.5) - case t if t.isInstanceOf[PDecimal] => - if (columnInfo.getPrecision == null || columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale) - case t if t.isInstanceOf[PTimestamp] || t.isInstanceOf[PUnsignedTimestamp] => TimestampType - case t if t.isInstanceOf[PTime] || t.isInstanceOf[PUnsignedTime] => TimestampType - case t if (t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate]) && !dateAsTimestamp => DateType - case t if (t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate]) && dateAsTimestamp => TimestampType - case t if t.isInstanceOf[PBoolean] => BooleanType - case t if t.isInstanceOf[PVarbinary] || t.isInstanceOf[PBinary] => BinaryType - case t if t.isInstanceOf[PIntegerArray] || t.isInstanceOf[PUnsignedIntArray] => ArrayType(IntegerType, containsNull = true) - case t if t.isInstanceOf[PBooleanArray] => ArrayType(BooleanType, containsNull = true) - case t if t.isInstanceOf[PVarcharArray] || t.isInstanceOf[PCharArray] => ArrayType(StringType, containsNull = true) - case t if t.isInstanceOf[PVarbinaryArray] || t.isInstanceOf[PBinaryArray] => ArrayType(BinaryType, containsNull = true) - case t if t.isInstanceOf[PLongArray] || t.isInstanceOf[PUnsignedLongArray] => ArrayType(LongType, containsNull = true) - case t if t.isInstanceOf[PSmallintArray] || t.isInstanceOf[PUnsignedSmallintArray] => ArrayType(IntegerType, containsNull = true) - case t if t.isInstanceOf[PTinyintArray] || t.isInstanceOf[PUnsignedTinyintArray] => ArrayType(ByteType, containsNull = true) - case t if t.isInstanceOf[PFloatArray] || t.isInstanceOf[PUnsignedFloatArray] => ArrayType(FloatType, containsNull = true) - case t if t.isInstanceOf[PDoubleArray] || t.isInstanceOf[PUnsignedDoubleArray] => ArrayType(DoubleType, containsNull = true) - case t if t.isInstanceOf[PDecimalArray] => ArrayType( - if (columnInfo.getPrecision == null || columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale), containsNull = true) - case t if t.isInstanceOf[PTimestampArray] || t.isInstanceOf[PUnsignedTimestampArray] => ArrayType(TimestampType, containsNull = true) - case t if t.isInstanceOf[PDateArray] || t.isInstanceOf[PUnsignedDateArray] => ArrayType(TimestampType, containsNull = true) - case t if t.isInstanceOf[PTimeArray] || t.isInstanceOf[PUnsignedTimeArray] => ArrayType(TimestampType, containsNull = true) - } } diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala index c35cc54..6d4c4cc 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala @@ -20,7 +20,7 @@ import org.apache.phoenix.util.ColumnInfo import org.joda.time.DateTime import scala.collection.{mutable, immutable} - +@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)") class PhoenixRecordWritable(columnMetaDataList: List[ColumnInfo]) extends DBWritable { val upsertValues = mutable.ArrayBuffer[Any]() val resultMap = mutable.Map[String, AnyRef]() diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala index 38bf29a..2f6ea8c 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala @@ -19,12 +19,11 @@ package org.apache.phoenix.spark import org.apache.hadoop.conf.Configuration import org.apache.spark.rdd.RDD +import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Row, SQLContext} -import org.apache.spark.sql.sources._ -import org.apache.phoenix.util.StringUtil.escapeStringConstant -import org.apache.phoenix.util.SchemaUtil +@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)") case class PhoenixRelation(tableName: String, zkUrl: String, dateAsTimestamp: Boolean = false)(@transient val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan { @@ -36,7 +35,7 @@ case class PhoenixRelation(tableName: String, zkUrl: String, dateAsTimestamp: Bo but this prevents having to load the whole table into Spark first. */ override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { - val(pushedFilters, unhandledFilters) = buildFilter(filters) + val(pushedFilters, _, _) = new FilterExpressionCompiler().pushFilters(filters) new PhoenixRDD( sqlContext.sparkContext, tableName, @@ -61,71 +60,10 @@ case class PhoenixRelation(tableName: String, zkUrl: String, dateAsTimestamp: Bo ).toDataFrame(sqlContext).schema } - // Attempt to create Phoenix-accepted WHERE clauses from Spark filters, - // mostly inspired from Spark SQL JDBCRDD and the couchbase-spark-connector - private def buildFilter(filters: Array[Filter]): (String, Array[Filter]) = { - if (filters.isEmpty) { - return ("" , Array[Filter]()) - } - - val filter = new StringBuilder("") - val unsupportedFilters = Array[Filter](); - var i = 0 - - filters.foreach(f => { - // Assume conjunction for multiple filters, unless otherwise specified - if (i > 0) { - filter.append(" AND") - } - - f match { - // Spark 1.3.1+ supported filters - case And(leftFilter, rightFilter) => filter.append(buildFilter(Array(leftFilter, rightFilter))) - case Or(leftFilter, rightFilter) => filter.append(buildFilter(Array(leftFilter)) + " OR " + buildFilter(Array(rightFilter))) - case Not(aFilter) => filter.append(" NOT " + buildFilter(Array(aFilter))) - case EqualTo(attr, value) => filter.append(s" ${escapeKey(attr)} = ${compileValue(value)}") - case GreaterThan(attr, value) => filter.append(s" ${escapeKey(attr)} > ${compileValue(value)}") - case GreaterThanOrEqual(attr, value) => filter.append(s" ${escapeKey(attr)} >= ${compileValue(value)}") - case LessThan(attr, value) => filter.append(s" ${escapeKey(attr)} < ${compileValue(value)}") - case LessThanOrEqual(attr, value) => filter.append(s" ${escapeKey(attr)} <= ${compileValue(value)}") - case IsNull(attr) => filter.append(s" ${escapeKey(attr)} IS NULL") - case IsNotNull(attr) => filter.append(s" ${escapeKey(attr)} IS NOT NULL") - case In(attr, values) => filter.append(s" ${escapeKey(attr)} IN ${values.map(compileValue).mkString("(", ",", ")")}") - case StringStartsWith(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue(value + "%")}") - case StringEndsWith(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue("%" + value)}") - case StringContains(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue("%" + value + "%")}") - case _ => unsupportedFilters :+ f - } - - i = i + 1 - }) - - (filter.toString(), unsupportedFilters) - } override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { - val(pushedFilters, unhandledFilters) = buildFilter(filters) + val (_, unhandledFilters, _) = new FilterExpressionCompiler().pushFilters(filters) unhandledFilters } - // Helper function to escape column key to work with SQL queries - private def escapeKey(key: String): String = SchemaUtil.getEscapedArgument(key) - - // Helper function to escape string values in SQL queries - private def compileValue(value: Any): Any = value match { - case stringValue: String => s"'${escapeStringConstant(stringValue)}'" - - // Borrowed from 'elasticsearch-hadoop', support these internal UTF types across Spark versions - // Spark 1.4 - case utf if (isClass(utf, "org.apache.spark.sql.types.UTF8String")) => s"'${escapeStringConstant(utf.toString)}'" - // Spark 1.5 - case utf if (isClass(utf, "org.apache.spark.unsafe.types.UTF8String")) => s"'${escapeStringConstant(utf.toString)}'" - - // Pass through anything else - case _ => value - } - - private def isClass(obj: Any, className: String) = { - className.equals(obj.getClass().getName()) - } } diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala index 1b33e6e..b073521 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala @@ -21,6 +21,7 @@ import org.apache.spark.rdd.RDD import scala.collection.JavaConversions._ +@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)") class ProductRDDFunctions[A <: Product](data: RDD[A]) extends Serializable { def saveToPhoenix(tableName: String, cols: Seq[String], diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkContextFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkContextFunctions.scala index 476ce8a..1b377ab 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkContextFunctions.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkContextFunctions.scala @@ -17,6 +17,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD +@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)") class SparkContextFunctions(@transient val sc: SparkContext) extends Serializable { /* diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala new file mode 100644 index 0000000..f69e988 --- /dev/null +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.spark + +import org.apache.phoenix.query.QueryConstants +import org.apache.phoenix.schema.types._ +import org.apache.phoenix.util.{ColumnInfo, SchemaUtil} +import org.apache.spark.sql.types._ + +object SparkSchemaUtil { + + def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo], dateAsTimestamp: Boolean = false) : StructType = { + val structFields = columnList.map(ci => { + val structType = phoenixTypeToCatalystType(ci, dateAsTimestamp) + StructField(normalizeColumnName(ci.getColumnName), structType) + }) + new StructType(structFields.toArray) + } + + def normalizeColumnName(columnName: String) = { + val unescapedColumnName = SchemaUtil.getUnEscapedFullColumnName(columnName) + var normalizedColumnName = "" + if (unescapedColumnName.indexOf(QueryConstants.NAME_SEPARATOR) < 0) { + normalizedColumnName = unescapedColumnName + } + else { + // split by separator to get the column family and column name + val tokens = unescapedColumnName.split(QueryConstants.NAME_SEPARATOR_REGEX) + normalizedColumnName = if (tokens(0) == "0") tokens(1) else unescapedColumnName + } + normalizedColumnName + } + + + // Lookup table for Phoenix types to Spark catalyst types + def phoenixTypeToCatalystType(columnInfo: ColumnInfo, dateAsTimestamp: Boolean): DataType = columnInfo.getPDataType match { + case t if t.isInstanceOf[PVarchar] || t.isInstanceOf[PChar] => StringType + case t if t.isInstanceOf[PLong] || t.isInstanceOf[PUnsignedLong] => LongType + case t if t.isInstanceOf[PInteger] || t.isInstanceOf[PUnsignedInt] => IntegerType + case t if t.isInstanceOf[PSmallint] || t.isInstanceOf[PUnsignedSmallint] => ShortType + case t if t.isInstanceOf[PTinyint] || t.isInstanceOf[PUnsignedTinyint] => ByteType + case t if t.isInstanceOf[PFloat] || t.isInstanceOf[PUnsignedFloat] => FloatType + case t if t.isInstanceOf[PDouble] || t.isInstanceOf[PUnsignedDouble] => DoubleType + // Use Spark system default precision for now (explicit to work with < 1.5) + case t if t.isInstanceOf[PDecimal] => + if (columnInfo.getPrecision == null || columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale) + case t if t.isInstanceOf[PTimestamp] || t.isInstanceOf[PUnsignedTimestamp] => TimestampType + case t if t.isInstanceOf[PTime] || t.isInstanceOf[PUnsignedTime] => TimestampType + case t if (t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate]) && !dateAsTimestamp => DateType + case t if (t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate]) && dateAsTimestamp => TimestampType + case t if t.isInstanceOf[PBoolean] => BooleanType + case t if t.isInstanceOf[PVarbinary] || t.isInstanceOf[PBinary] => BinaryType + case t if t.isInstanceOf[PIntegerArray] || t.isInstanceOf[PUnsignedIntArray] => ArrayType(IntegerType, containsNull = true) + case t if t.isInstanceOf[PBooleanArray] => ArrayType(BooleanType, containsNull = true) + case t if t.isInstanceOf[PVarcharArray] || t.isInstanceOf[PCharArray] => ArrayType(StringType, containsNull = true) + case t if t.isInstanceOf[PVarbinaryArray] || t.isInstanceOf[PBinaryArray] => ArrayType(BinaryType, containsNull = true) + case t if t.isInstanceOf[PLongArray] || t.isInstanceOf[PUnsignedLongArray] => ArrayType(LongType, containsNull = true) + case t if t.isInstanceOf[PSmallintArray] || t.isInstanceOf[PUnsignedSmallintArray] => ArrayType(IntegerType, containsNull = true) + case t if t.isInstanceOf[PTinyintArray] || t.isInstanceOf[PUnsignedTinyintArray] => ArrayType(ByteType, containsNull = true) + case t if t.isInstanceOf[PFloatArray] || t.isInstanceOf[PUnsignedFloatArray] => ArrayType(FloatType, containsNull = true) + case t if t.isInstanceOf[PDoubleArray] || t.isInstanceOf[PUnsignedDoubleArray] => ArrayType(DoubleType, containsNull = true) + case t if t.isInstanceOf[PDecimalArray] => ArrayType( + if (columnInfo.getPrecision == null || columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale), containsNull = true) + case t if t.isInstanceOf[PTimestampArray] || t.isInstanceOf[PUnsignedTimestampArray] => ArrayType(TimestampType, containsNull = true) + case t if t.isInstanceOf[PDateArray] || t.isInstanceOf[PUnsignedDateArray] => ArrayType(TimestampType, containsNull = true) + case t if t.isInstanceOf[PTimeArray] || t.isInstanceOf[PUnsignedTimeArray] => ArrayType(TimestampType, containsNull = true) + } + +} diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSqlContextFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSqlContextFunctions.scala index a0842c9..f9154ad 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSqlContextFunctions.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSqlContextFunctions.scala @@ -16,6 +16,7 @@ package org.apache.phoenix.spark import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.{DataFrame, SQLContext} +@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)") class SparkSqlContextFunctions(@transient val sqlContext: SQLContext) extends Serializable { /* diff --git a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala new file mode 100644 index 0000000..712ec2d --- /dev/null +++ b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala @@ -0,0 +1,21 @@ +package org.apache.spark.sql.execution.datasources.jdbc + +import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType} +import org.apache.spark.sql.types._ + +private object PhoenixJdbcDialect extends JdbcDialect { + + override def canHandle(url: String): Boolean = url.startsWith("jdbc:phoenix") + + /** + * This is only called for ArrayType (see JdbcUtils.makeSetter) + */ + override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { + case StringType => Some(JdbcType("VARCHAR", java.sql.Types.VARCHAR)) + case BinaryType => Some(JdbcType("BINARY(" + dt.defaultSize + ")", java.sql.Types.BINARY)) + case ByteType => Some(JdbcType("TINYINT", java.sql.Types.TINYINT)) + case _ => None + } + + +} \ No newline at end of file diff --git a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala new file mode 100644 index 0000000..eac483a --- /dev/null +++ b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.sql.{Connection, PreparedStatement, ResultSet} +import java.util.Locale + +import org.apache.spark.executor.InputMetrics +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow +import org.apache.spark.sql.catalyst.util.{DateTimeUtils, GenericArrayData} +import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils +import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils._ +import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.NextIterator + +object SparkJdbcUtil { + + def toRow(schema: StructType, internalRow: InternalRow) : Row = { + val encoder = RowEncoder(schema).resolveAndBind() + encoder.fromRow(internalRow) + } + + // A `JDBCValueGetter` is responsible for getting a value from `ResultSet` into a field + // for `MutableRow`. The last argument `Int` means the index for the value to be set in + // the row and also used for the value in `ResultSet`. + private type JDBCValueGetter = (ResultSet, InternalRow, Int) => Unit + + private def nullSafeConvert[T](input: T, f: T => Any): Any = { + if (input == null) { + null + } else { + f(input) + } + } + + /** + * Creates `JDBCValueGetter`s according to [[StructType]], which can set + * each value from `ResultSet` to each field of [[InternalRow]] correctly. + */ + private def makeGetters(schema: StructType): Array[JDBCValueGetter] = + schema.fields.map(sf => makeGetter(sf.dataType, sf.metadata)) + + private def makeGetter(dt: DataType, metadata: Metadata): JDBCValueGetter = dt match { + case BooleanType => + (rs: ResultSet, row: InternalRow, pos: Int) => + row.setBoolean(pos, rs.getBoolean(pos + 1)) + + case DateType => + (rs: ResultSet, row: InternalRow, pos: Int) => + // DateTimeUtils.fromJavaDate does not handle null value, so we need to check it. + val dateVal = rs.getDate(pos + 1) + if (dateVal != null) { + row.setInt(pos, DateTimeUtils.fromJavaDate(dateVal)) + } else { + row.update(pos, null) + } + + // When connecting with Oracle DB through JDBC, the precision and scale of BigDecimal + // object returned by ResultSet.getBigDecimal is not correctly matched to the table + // schema reported by ResultSetMetaData.getPrecision and ResultSetMetaData.getScale. + // If inserting values like 19999 into a column with NUMBER(12, 2) type, you get through + // a BigDecimal object with scale as 0. But the dataframe schema has correct type as + // DecimalType(12, 2). Thus, after saving the dataframe into parquet file and then + // retrieve it, you will get wrong result 199.99. + // So it is needed to set precision and scale for Decimal based on JDBC metadata. + case DecimalType.Fixed(p, s) => + (rs: ResultSet, row: InternalRow, pos: Int) => + val decimal = + nullSafeConvert[java.math.BigDecimal](rs.getBigDecimal(pos + 1), d => Decimal(d, p, s)) + row.update(pos, decimal) + + case DoubleType => + (rs: ResultSet, row: InternalRow, pos: Int) => + row.setDouble(pos, rs.getDouble(pos + 1)) + + case FloatType => + (rs: ResultSet, row: InternalRow, pos: Int) => + row.setFloat(pos, rs.getFloat(pos + 1)) + + case IntegerType => + (rs: ResultSet, row: InternalRow, pos: Int) => + row.setInt(pos, rs.getInt(pos + 1)) + + case LongType if metadata.contains("binarylong") => + (rs: ResultSet, row: InternalRow, pos: Int) => + val bytes = rs.getBytes(pos + 1) + var ans = 0L + var j = 0 + while (j < bytes.length) { + ans = 256 * ans + (255 & bytes(j)) + j = j + 1 + } + row.setLong(pos, ans) + + case LongType => + (rs: ResultSet, row: InternalRow, pos: Int) => + row.setLong(pos, rs.getLong(pos + 1)) + + case ShortType => + (rs: ResultSet, row: InternalRow, pos: Int) => + row.setShort(pos, rs.getShort(pos + 1)) + + case StringType => + (rs: ResultSet, row: InternalRow, pos: Int) => + // TODO(davies): use getBytes for better performance, if the encoding is UTF-8 + row.update(pos, UTF8String.fromString(rs.getString(pos + 1))) + + case TimestampType => + (rs: ResultSet, row: InternalRow, pos: Int) => + val t = rs.getTimestamp(pos + 1) + if (t != null) { + row.setLong(pos, DateTimeUtils.fromJavaTimestamp(t)) + } else { + row.update(pos, null) + } + + case BinaryType => + (rs: ResultSet, row: InternalRow, pos: Int) => + row.update(pos, rs.getBytes(pos + 1)) + + case ByteType => + (rs: ResultSet, row: InternalRow, pos: Int) => + row.update(pos, rs.getByte(pos + 1)) + + case ArrayType(et, _) => + val elementConversion = et match { + case TimestampType => + (array: Object) => + array.asInstanceOf[Array[java.sql.Timestamp]].map { timestamp => + nullSafeConvert(timestamp, DateTimeUtils.fromJavaTimestamp) + } + + case StringType => + (array: Object) => + // some underling types are not String such as uuid, inet, cidr, etc. + array.asInstanceOf[Array[java.lang.Object]] + .map(obj => if (obj == null) null else UTF8String.fromString(obj.toString)) + + case DateType => + (array: Object) => + array.asInstanceOf[Array[java.sql.Date]].map { date => + nullSafeConvert(date, DateTimeUtils.fromJavaDate) + } + + case dt: DecimalType => + (array: Object) => + array.asInstanceOf[Array[java.math.BigDecimal]].map { decimal => + nullSafeConvert[java.math.BigDecimal]( + decimal, d => Decimal(d, dt.precision, dt.scale)) + } + + case LongType if metadata.contains("binarylong") => + throw new IllegalArgumentException(s"Unsupported array element " + + s"type ${dt.catalogString} based on binary") + + case ArrayType(_, _) => + throw new IllegalArgumentException("Nested arrays unsupported") + + case _ => (array: Object) => array.asInstanceOf[Array[Any]] + } + + (rs: ResultSet, row: InternalRow, pos: Int) => + val array = nullSafeConvert[java.sql.Array]( + input = rs.getArray(pos + 1), + array => new GenericArrayData(elementConversion.apply(array.getArray))) + row.update(pos, array) + + case _ => throw new IllegalArgumentException(s"Unsupported type ${dt.catalogString}") + } + + // TODO just use JdbcUtils.resultSetToSparkInternalRows in Spark 3.0 (see SPARK-26499) + def resultSetToSparkInternalRows( + resultSet: ResultSet, + schema: StructType, + inputMetrics: InputMetrics): Iterator[InternalRow] = { + // JdbcUtils.resultSetToSparkInternalRows(resultSet, schema, inputMetrics) + new NextIterator[InternalRow] { + private[this] val rs = resultSet + private[this] val getters: Array[JDBCValueGetter] = makeGetters(schema) + private[this] val mutableRow = new SpecificInternalRow(schema.fields.map(x => x.dataType)) + + override protected def close(): Unit = { + try { + rs.close() + } catch { + case e: Exception => + } + } + + override protected def getNext(): InternalRow = { + if (rs.next()) { + inputMetrics.incRecordsRead(1) + var i = 0 + while (i < getters.length) { + getters(i).apply(rs, mutableRow, i) + if (rs.wasNull) mutableRow.setNullAt(i) + i = i + 1 + } + mutableRow + } else { + finished = true + null.asInstanceOf[InternalRow] + } + } + } + } + + // A `JDBCValueSetter` is responsible for setting a value from `Row` into a field for + // `PreparedStatement`. The last argument `Int` means the index for the value to be set + // in the SQL statement and also used for the value in `Row`. + private type JDBCValueSetter = (PreparedStatement, Row, Int) => Unit + + // take from Spark JdbcUtils.scala, cannot be used directly because the method is private + def makeSetter( + conn: Connection, + dialect: JdbcDialect, + dataType: DataType): JDBCValueSetter = dataType match { + case IntegerType => + (stmt: PreparedStatement, row: Row, pos: Int) => + stmt.setInt(pos + 1, row.getInt(pos)) + + case LongType => + (stmt: PreparedStatement, row: Row, pos: Int) => + stmt.setLong(pos + 1, row.getLong(pos)) + + case DoubleType => + (stmt: PreparedStatement, row: Row, pos: Int) => + stmt.setDouble(pos + 1, row.getDouble(pos)) + + case FloatType => + (stmt: PreparedStatement, row: Row, pos: Int) => + stmt.setFloat(pos + 1, row.getFloat(pos)) + + case ShortType => + (stmt: PreparedStatement, row: Row, pos: Int) => + stmt.setInt(pos + 1, row.getShort(pos)) + + case ByteType => + (stmt: PreparedStatement, row: Row, pos: Int) => + stmt.setInt(pos + 1, row.getByte(pos)) + + case BooleanType => + (stmt: PreparedStatement, row: Row, pos: Int) => + stmt.setBoolean(pos + 1, row.getBoolean(pos)) + + case StringType => + (stmt: PreparedStatement, row: Row, pos: Int) => + stmt.setString(pos + 1, row.getString(pos)) + + case BinaryType => + (stmt: PreparedStatement, row: Row, pos: Int) => + stmt.setBytes(pos + 1, row.getAs[Array[Byte]](pos)) + + case TimestampType => + (stmt: PreparedStatement, row: Row, pos: Int) => + stmt.setTimestamp(pos + 1, row.getAs[java.sql.Timestamp](pos)) + + case DateType => + (stmt: PreparedStatement, row: Row, pos: Int) => + stmt.setDate(pos + 1, row.getAs[java.sql.Date](pos)) + + case t: DecimalType => + (stmt: PreparedStatement, row: Row, pos: Int) => + stmt.setBigDecimal(pos + 1, row.getDecimal(pos)) + + case ArrayType(et, _) => + // remove type length parameters from end of type name + val typeName = getJdbcType(et, dialect).databaseTypeDefinition + .toLowerCase(Locale.ROOT).split("\\(")(0) + (stmt: PreparedStatement, row: Row, pos: Int) => + val array = conn.createArrayOf( + typeName, + row.getSeq[AnyRef](pos).toArray) + stmt.setArray(pos + 1, array) + + case _ => + (_: PreparedStatement, _: Row, pos: Int) => + throw new IllegalArgumentException( + s"Can't translate non-null value for field $pos") + } + + // taken from Spark JdbcUtils + def getJdbcType(dt: DataType, dialect: JdbcDialect): JdbcType = { + dialect.getJDBCType(dt).orElse(getCommonJDBCType(dt)).getOrElse( + throw new IllegalArgumentException(s"Can't get JDBC type for ${dt.catalogString}")) + } + +} diff --git a/pom.xml b/pom.xml index e817200..70b3eb1 100644 --- a/pom.xml +++ b/pom.xml @@ -102,7 +102,7 @@ <jettyVersion>8.1.7.v20120910</jettyVersion> <tephra.version>0.15.0-incubating</tephra.version> <omid.version>1.0.0</omid.version> - <spark.version>2.3.2</spark.version> + <spark.version>2.4.0</spark.version> <scala.version>2.11.8</scala.version> <scala.binary.version>2.11</scala.binary.version> <stream.version>2.9.5</stream.version>