PHOENIX-1815 - Spark Datasource api
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3fb3bb4d Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3fb3bb4d Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3fb3bb4d Branch: refs/heads/calcite Commit: 3fb3bb4d2231972dc06326251b76cc1431da7386 Parents: e1bbb94 Author: ravimagham <ravimag...@apache.org> Authored: Wed Apr 15 19:03:33 2015 -0700 Committer: ravimagham <ravimag...@apache.org> Committed: Wed Apr 15 19:03:33 2015 -0700 ---------------------------------------------------------------------- phoenix-assembly/pom.xml | 6 +- .../src/build/components/all-common-jars.xml | 11 ++ phoenix-spark/README.md | 74 ++++++++-- phoenix-spark/pom.xml | 6 - phoenix-spark/src/it/resources/setup.sql | 1 + .../apache/phoenix/spark/PhoenixSparkIT.scala | 135 ++++++++++++++++--- .../phoenix/spark/ConfigurationUtil.scala | 65 +++++++++ .../phoenix/spark/DataFrameFunctions.scala | 51 +++++++ .../apache/phoenix/spark/DefaultSource.scala | 41 ++++++ .../org/apache/phoenix/spark/PhoenixRDD.scala | 12 +- .../phoenix/spark/PhoenixRecordWritable.scala | 2 +- .../apache/phoenix/spark/PhoenixRelation.scala | 80 +++++++++++ .../phoenix/spark/ProductRDDFunctions.scala | 21 +-- .../org/apache/phoenix/spark/package.scala | 6 +- pom.xml | 5 + 15 files changed, 453 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-assembly/pom.xml b/phoenix-assembly/pom.xml index 51f767f..b3a992e 100644 --- a/phoenix-assembly/pom.xml +++ b/phoenix-assembly/pom.xml @@ -142,9 +142,13 @@ <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-flume</artifactId> </dependency> - <dependency> + <dependency> <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-pig</artifactId> </dependency> + <dependency> + <groupId>org.apache.phoenix</groupId> + <artifactId>phoenix-spark</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-assembly/src/build/components/all-common-jars.xml ---------------------------------------------------------------------- diff --git a/phoenix-assembly/src/build/components/all-common-jars.xml b/phoenix-assembly/src/build/components/all-common-jars.xml index ce6da59..769e28f 100644 --- a/phoenix-assembly/src/build/components/all-common-jars.xml +++ b/phoenix-assembly/src/build/components/all-common-jars.xml @@ -71,5 +71,16 @@ </excludes> <fileMode>0644</fileMode> </fileSet> + <fileSet> + <directory>${project.basedir}/../phoenix-spark/target/</directory> + <outputDirectory>lib</outputDirectory> + <includes> + <include>phoenix-*.jar</include> + </includes> + <excludes> + <exclude></exclude> + </excludes> + <fileMode>0644</fileMode> + </fileSet> </fileSets> </component> http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/README.md ---------------------------------------------------------------------- diff --git a/phoenix-spark/README.md b/phoenix-spark/README.md index 1c030f8..1e53c98 100644 --- a/phoenix-spark/README.md +++ b/phoenix-spark/README.md @@ -11,7 +11,7 @@ UPSERT INTO TABLE1 (ID, COL1) VALUES (1, 'test_row_1'); UPSERT INTO TABLE1 (ID, COL1) VALUES (2, 'test_row_2'); ``` -### Load as a DataFrame +### Load as a DataFrame using the Data Source API ```scala import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext @@ -20,15 +20,39 @@ import org.apache.phoenix.spark._ val sc = new SparkContext("local", "phoenix-test") val sqlContext = new SQLContext(sc) +val df = sqlContext.load( + "org.apache.phoenix.spark", + Map("table" -> "TABLE1", "zkUrl" -> "phoenix-server:2181") +) + +df + .filter(df("COL1") === "test_row_1" && df("ID") === 1L) + .select(df("ID")) + .show +``` + +### Load as a DataFrame directly using a Configuration object +```scala +import org.apache.hadoop.conf.Configuration +import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext +import org.apache.phoenix.spark._ + +val configuration = new Configuration() +// Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum' + +val sc = new SparkContext("local", "phoenix-test") +val sqlContext = new SQLContext(sc) + // Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame val df = sqlContext.phoenixTableAsDataFrame( - "TABLE1", Array("ID", "COL1"), zkUrl = Some("phoenix-server:2181") + "TABLE1", Array("ID", "COL1"), conf = configuration ) df.show ``` -### Load as an RDD +### Load as an RDD, using a Zookeeper URL ```scala import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext @@ -47,7 +71,10 @@ val firstId = rdd1.first()("ID").asInstanceOf[Long] val firstCol = rdd1.first()("COL1").asInstanceOf[String] ``` -## Saving RDDs to Phoenix +## Saving RDDs to Phoenix + +`saveToPhoenix` is an implicit method on RDD[Product], or an RDD of Tuples. The data types must +correspond to the Java types Phoenix supports (http://phoenix.apache.org/language/datatypes.html) Given a Phoenix table with the following DDL @@ -55,9 +82,6 @@ Given a Phoenix table with the following DDL CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); ``` -`saveToPhoenix` is an implicit method on RDD[Product], or an RDD of Tuples. The data types must -correspond to the Java types Phoenix supports (http://phoenix.apache.org/language/datatypes.html) - ```scala import org.apache.spark.SparkContext import org.apache.phoenix.spark._ @@ -74,6 +98,38 @@ sc ) ``` +## Saving DataFrames to Phoenix + +The `save` is method on DataFrame allows passing in a data source type. You can use +`org.apache.phoenix.spark`, and must also pass in a `table` and `zkUrl` parameter to +specify which table and server to persist the DataFrame to. The column names are derived from +the DataFrame's schema field names, and must match the Phoenix column names. + +The `save` method also takes a `SaveMode` option, for which only `SaveMode.Overwrite` is supported. + +Given two Phoenix tables with the following DDL: + +```sql +CREATE TABLE INPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); +CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); +``` + +```scala +import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext +import org.apache.phoenix.spark._ + +// Load INPUT_TABLE +val sc = new SparkContext("local", "phoenix-test") +val sqlContext = new SQLContext(sc) +val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "INPUT_TABLE", + "zkUrl" -> hbaseConnectionString)) + +// Save to OUTPUT_TABLE +df.save("org.apache.phoenix.spark", SaveMode.Overwrite, Map("table" -> "OUTPUT_TABLE", + "zkUrl" -> hbaseConnectionString)) +``` + ## Notes The functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and `saveToPhoenix` all support @@ -85,5 +141,7 @@ in the `conf` parameter. Similarly, if no configuration is passed in, `zkUrl` mu ## Limitations -- No pushdown predicate support from Spark SQL (yet) +- Basic support for column and predicate pushdown using the Data Source API +- The Data Source API does not support passing custom Phoenix settings in configuration, you must +create the DataFrame or RDD directly if you need fine-grained configuration. - No support for aggregate or distinct functions (http://phoenix.apache.org/phoenix_mr.html) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-spark/pom.xml b/phoenix-spark/pom.xml index 8b06cf7..adeed88 100644 --- a/phoenix-spark/pom.xml +++ b/phoenix-spark/pom.xml @@ -97,12 +97,6 @@ </dependency> <dependency> - <groupId>org.xerial.snappy</groupId> - <artifactId>snappy-java</artifactId> - <version>1.1.1.6</version> - </dependency> - - <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop-two.version}</version> http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/it/resources/setup.sql ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/it/resources/setup.sql b/phoenix-spark/src/it/resources/setup.sql index ce74c58..40157a2 100644 --- a/phoenix-spark/src/it/resources/setup.sql +++ b/phoenix-spark/src/it/resources/setup.sql @@ -15,6 +15,7 @@ -- limitations under the License. CREATE TABLE table1 (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR) +CREATE TABLE table1_copy (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR) CREATE TABLE table2 (id BIGINT NOT NULL PRIMARY KEY, table1_id BIGINT, "t2col1" VARCHAR) UPSERT INTO table1 (id, col1) VALUES (1, 'test_row_1') UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (1, 1, 'test_child_1') http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala index 149baec..db99f65 100644 --- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala +++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala @@ -17,14 +17,14 @@ import java.sql.{Connection, DriverManager} import java.util.Date import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hbase.{HConstants, HBaseTestingUtility} +import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, HBaseTestingUtility} import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT import org.apache.phoenix.query.BaseTest import org.apache.phoenix.schema.ColumnNotFoundException import org.apache.phoenix.schema.types.PVarchar import org.apache.phoenix.util.ColumnInfo -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.types.{StringType, StructField} +import org.apache.spark.sql.{SaveMode, execution, SQLContext} +import org.apache.spark.sql.types.{LongType, DataType, StringType, StructField} import org.apache.spark.{SparkConf, SparkContext} import org.joda.time.DateTime import org.scalatest._ @@ -139,7 +139,10 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll { df2.registerTempTable("sql_table_2") - val sqlRdd = sqlContext.sql("SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1 INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)") + val sqlRdd = sqlContext.sql(""" + |SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1 + |INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)""".stripMargin + ) val count = sqlRdd.count() @@ -149,7 +152,9 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll { test("Can create schema RDD and execute query on case sensitive table (no config)") { val sqlContext = new SQLContext(sc) - val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"), zkUrl = Some(quorumAddress)) + + val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"), + zkUrl = Some(quorumAddress)) df1.registerTempTable("table3") @@ -163,7 +168,8 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll { test("Can create schema RDD and execute constrained query") { val sqlContext = new SQLContext(sc) - val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = hbaseConfiguration) + val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), + conf = hbaseConfiguration) df1.registerTempTable("sql_table_1") @@ -173,7 +179,10 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll { df2.registerTempTable("sql_table_2") - val sqlRdd = sqlContext.sql("SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1 INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)") + val sqlRdd = sqlContext.sql(""" + |SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1 + |INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)""".stripMargin + ) val count = sqlRdd.count() @@ -194,7 +203,7 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll { // we have to execute an action before the predicate failure can occur val count = sqlRdd.count() - }.getCause shouldBe a [ColumnNotFoundException] + }.getCause shouldBe a[ColumnNotFoundException] } test("Can create schema RDD with predicate that will never match") { @@ -216,10 +225,15 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll { test("Can create schema RDD with complex predicate") { val sqlContext = new SQLContext(sc) - val df1 = sqlContext.phoenixTableAsDataFrame("DATE_PREDICATE_TEST_TABLE", Array("ID", "TIMESERIES_KEY"), - predicate = Some("ID > 0 AND TIMESERIES_KEY BETWEEN CAST(TO_DATE('1990-01-01 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AND CAST(TO_DATE('1990-01-30 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP)"), + val df1 = sqlContext.phoenixTableAsDataFrame( + "DATE_PREDICATE_TEST_TABLE", + Array("ID", "TIMESERIES_KEY"), + predicate = Some(""" + |ID > 0 AND TIMESERIES_KEY BETWEEN + |CAST(TO_DATE('1990-01-01 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AND + |CAST(TO_DATE('1990-01-30 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP)""".stripMargin), conf = hbaseConfiguration) - + df1.registerTempTable("date_predicate_test_table") val sqlRdd = df1.sqlContext.sql("SELECT * FROM date_predicate_test_table") @@ -248,7 +262,7 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll { count shouldEqual 1L } - + test("Can read a table as an RDD") { val rdd1 = sc.phoenixTableAsRDD("ARRAY_TEST_TABLE", Seq("ID", "VCARRAY"), conf = hbaseConfiguration) @@ -271,7 +285,7 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll { .parallelize(dataSet) .saveToPhoenix( "OUTPUT_TEST_TABLE", - Seq("ID","COL1","COL2"), + Seq("ID", "COL1", "COL2"), hbaseConfiguration ) @@ -279,7 +293,7 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll { val stmt = conn.createStatement() val rs = stmt.executeQuery("SELECT ID, COL1, COL2 FROM OUTPUT_TEST_TABLE") val results = ListBuffer[(Long, String, Int)]() - while(rs.next()) { + while (rs.next()) { results.append((rs.getLong(1), rs.getString(2), rs.getInt(3))) } @@ -306,7 +320,7 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll { val stmt = conn.createStatement() val rs = stmt.executeQuery("SELECT COL3 FROM OUTPUT_TEST_TABLE WHERE ID = 1 OR ID = 2 ORDER BY ID ASC") val results = ListBuffer[java.sql.Date]() - while(rs.next()) { + while (rs.next()) { results.append(rs.getDate(1)) } @@ -315,12 +329,89 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll { results(1).getTime shouldEqual date.getTime } - test("Not specifying a zkUrl or a config quorum URL should fail") { - intercept[UnsupportedOperationException] { - val sqlContext = new SQLContext(sc) - val badConf = new Configuration(hbaseConfiguration) - badConf.unset(HConstants.ZOOKEEPER_QUORUM) - sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = badConf) + test("Can infer schema without defining columns") { + val sqlContext = new SQLContext(sc) + val df = sqlContext.phoenixTableAsDataFrame("TABLE2", Seq(), conf = hbaseConfiguration) + df.schema("ID").dataType shouldEqual LongType + df.schema("TABLE1_ID").dataType shouldEqual LongType + df.schema("t2col1").dataType shouldEqual StringType + } + + test("Spark SQL can use Phoenix as a data source with no schema specified") { + val sqlContext = new SQLContext(sc) + val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1", + "zkUrl" -> quorumAddress)) + df.count() shouldEqual 2 + df.schema("ID").dataType shouldEqual LongType + df.schema("COL1").dataType shouldEqual StringType + } + + test("Spark SQL can use Phoenix as a data source with PrunedFilteredScan") { + val sqlContext = new SQLContext(sc) + val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1", + "zkUrl" -> quorumAddress)) + val res = df.filter(df("COL1") === "test_row_1" && df("ID") === 1L).select(df("ID")) + + // Make sure we got the right value back + assert(res.first().getLong(0) == 1L) + + /* + NOTE: There doesn't appear to be any way of verifying from the Spark query planner that + filtering is being pushed down and done server-side. However, since PhoenixRelation + implements PrunedFilteredScan, debugging has shown that both the SELECT columns and WHERE + predicates are being passed along to us, which we then forward it to Phoenix. + TODO: investigate further to find a way to verify server-side pushdown + */ + } + + test("Can persist a dataframe using 'DataFrame.saveToPhoenix'") { + // Load from TABLE1 + val sqlContext = new SQLContext(sc) + val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1", + "zkUrl" -> quorumAddress)) + + // Save to TABLE1_COPY + df.saveToPhoenix("TABLE1_COPY", zkUrl = Some(quorumAddress)) + + // Verify results + val stmt = conn.createStatement() + val rs = stmt.executeQuery("SELECT * FROM TABLE1_COPY") + + val checkResults = List((1L, "test_row_1"), (2, "test_row_2")) + val results = ListBuffer[(Long, String)]() + while (rs.next()) { + results.append((rs.getLong(1), rs.getString(2))) } + stmt.close() + + results.toList shouldEqual checkResults } -} + + test("Can persist a dataframe using 'DataFrame.save()") { + // Clear TABLE1_COPY + var stmt = conn.createStatement() + stmt.executeUpdate("DELETE FROM TABLE1_COPY") + stmt.close() + + // Load TABLE1, save as TABLE1_COPY + val sqlContext = new SQLContext(sc) + val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1", + "zkUrl" -> quorumAddress)) + + // Save to TABLE21_COPY + df.save("org.apache.phoenix.spark", SaveMode.Overwrite, Map("table" -> "TABLE1_COPY", "zkUrl" -> quorumAddress)) + + // Verify results + stmt = conn.createStatement() + val rs = stmt.executeQuery("SELECT * FROM TABLE1_COPY") + + val checkResults = List((1L, "test_row_1"), (2, "test_row_2")) + val results = ListBuffer[(Long, String)]() + while (rs.next()) { + results.append((rs.getLong(1), rs.getString(2))) + } + stmt.close() + + results.toList shouldEqual checkResults + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala new file mode 100644 index 0000000..c0c7248 --- /dev/null +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala @@ -0,0 +1,65 @@ +/* + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.phoenix.spark + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants} +import org.apache.phoenix.mapreduce.util.{ColumnInfoToStringEncoderDecoder, PhoenixConfigurationUtil} +import org.apache.phoenix.util.ColumnInfo +import scala.collection.JavaConversions._ + +object ConfigurationUtil extends Serializable { + + def getOutputConfiguration(tableName: String, columns: Seq[String], zkUrl: Option[String], conf: Option[Configuration]): Configuration = { + + // Create an HBaseConfiguration object from the passed in config, if present + val config = conf match { + case Some(c) => HBaseConfiguration.create(c) + case _ => HBaseConfiguration.create() + } + + // Set the table to save to + PhoenixConfigurationUtil.setOutputTableName(config, tableName) + + // Infer column names from the DataFrame schema + PhoenixConfigurationUtil.setUpsertColumnNames(config, columns.mkString(",")) + + // Override the Zookeeper URL if present. Throw exception if no address given. + zkUrl match { + case Some(url) => config.set(HConstants.ZOOKEEPER_QUORUM, url ) + case _ => { + if(config.get(HConstants.ZOOKEEPER_QUORUM) == null) { + throw new UnsupportedOperationException( + s"One of zkUrl or '${HConstants.ZOOKEEPER_QUORUM}' config property must be provided" + ) + } + } + } + + // Return the configuration object + config + } + + // Return a serializable representation of the columns + def encodeColumns(conf: Configuration): String = { + ColumnInfoToStringEncoderDecoder.encode( + PhoenixConfigurationUtil.getUpsertColumnMetadataList(conf) + ) + } + + // Decode the columns to a list of ColumnInfo objects + def decodeColumns(encodedColumns: String): List[ColumnInfo] = { + ColumnInfoToStringEncoderDecoder.decode(encodedColumns).toList + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala new file mode 100644 index 0000000..e17d7a5 --- /dev/null +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala @@ -0,0 +1,51 @@ +/* + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.phoenix.spark + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.HConstants +import org.apache.hadoop.io.NullWritable +import org.apache.phoenix.mapreduce.PhoenixOutputFormat +import org.apache.phoenix.mapreduce.util.{ColumnInfoToStringEncoderDecoder, PhoenixConfigurationUtil} +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.DataFrame + +class DataFrameFunctions(data: DataFrame) extends Logging with Serializable { + + def saveToPhoenix(tableName: String, conf: Configuration = new Configuration, + zkUrl: Option[String] = None): Unit = { + + val config = ConfigurationUtil.getOutputConfiguration(tableName, data.schema.fieldNames, zkUrl, Some(conf)) + + // Encode the column info to a serializable type + val encodedColumns = ConfigurationUtil.encodeColumns(config) + + // Map the row object into a PhoenixRecordWritable + val phxRDD: RDD[(NullWritable, PhoenixRecordWritable)] = data.map { row => + val rec = new PhoenixRecordWritable(encodedColumns) + row.toSeq.foreach { e => rec.add(e) } + (null, rec) + } + + // Save it + phxRDD.saveAsNewAPIHadoopFile( + "", + classOf[NullWritable], + classOf[PhoenixRecordWritable], + classOf[PhoenixOutputFormat[PhoenixRecordWritable]], + config + ) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala new file mode 100644 index 0000000..b0e9754 --- /dev/null +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala @@ -0,0 +1,41 @@ +package org.apache.phoenix.spark + +import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext} +import org.apache.spark.sql.sources.{CreatableRelationProvider, BaseRelation, RelationProvider} +import org.apache.phoenix.spark._ + +class DefaultSource extends RelationProvider with CreatableRelationProvider { + + // Override 'RelationProvider.createRelation', this enables DataFrame.load() + override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { + verifyParameters(parameters) + + new PhoenixRelation( + parameters("table"), + parameters("zkUrl") + )(sqlContext) + } + + // Override 'CreatableRelationProvider.createRelation', this enables DataFrame.save() + override def createRelation(sqlContext: SQLContext, mode: SaveMode, + parameters: Map[String, String], data: DataFrame): BaseRelation = { + + if (!mode.equals(SaveMode.Overwrite)) { + throw new Exception("SaveMode other than SaveMode.OverWrite is not supported") + } + + verifyParameters(parameters) + + // Save the DataFrame to Phoenix + data.saveToPhoenix(parameters("table"), zkUrl = parameters.get("zkUrl")) + + // Return a relation of the saved data + createRelation(sqlContext, parameters) + } + + // Ensure the required parameters are present + def verifyParameters(parameters: Map[String, String]): Unit = { + if (parameters.get("table").isEmpty) throw new RuntimeException("No Phoenix 'table' option defined") + if (parameters.get("zkUrl").isEmpty) throw new RuntimeException("No Phoenix 'zkUrl' option defined") + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala index b27f9f9..9a359e3 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala @@ -14,7 +14,7 @@ package org.apache.phoenix.spark import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hbase.HConstants +import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants} import org.apache.hadoop.io.NullWritable import org.apache.phoenix.mapreduce.PhoenixInputFormat import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil @@ -65,12 +65,12 @@ class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String], def buildSql(table: String, columns: Seq[String], predicate: Option[String]): String = { val query = "SELECT %s FROM \"%s\"".format( - columns.map(f => "\"" + f + "\"").mkString(", "), + if (columns.isEmpty) "*" else columns.map(f => "\"" + f + "\"").mkString(", "), table ) query + (predicate match { - case Some(p: String) => " WHERE " + p + case Some(p: String) if p.length > 0 => " WHERE " + p case _ => "" }) } @@ -79,10 +79,12 @@ class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String], // This is just simply not serializable, so don't try, but clone it because // PhoenixConfigurationUtil mutates it. - val config = new Configuration(conf) + val config = HBaseConfiguration.create(conf) PhoenixConfigurationUtil.setInputQuery(config, buildSql(table, columns, predicate)) - PhoenixConfigurationUtil.setSelectColumnNames(config, columns.mkString(",")) + if(!columns.isEmpty) { + PhoenixConfigurationUtil.setSelectColumnNames(config, columns.mkString(",")) + } PhoenixConfigurationUtil.setInputTableName(config, "\"" + table + "\"") PhoenixConfigurationUtil.setInputClass(config, classOf[PhoenixRecordWritable]) http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala index 48a70ec..67e0bd2 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala @@ -31,7 +31,7 @@ class PhoenixRecordWritable(var encodedColumns: String) extends DBWritable { override def write(statement: PreparedStatement): Unit = { // Decode the ColumnInfo list - val columns = ColumnInfoToStringEncoderDecoder.decode(encodedColumns).toList + val columns = ConfigurationUtil.decodeColumns(encodedColumns) // Make sure we at least line up in size if(upsertValues.length != columns.length) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala new file mode 100644 index 0000000..4177022 --- /dev/null +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala @@ -0,0 +1,80 @@ +package org.apache.phoenix.spark + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.sources._ +import org.apache.commons.lang.StringEscapeUtils.escapeSql + +case class PhoenixRelation(tableName: String, zkUrl: String)(@transient val sqlContext: SQLContext) + extends BaseRelation with PrunedFilteredScan { + + /* + This is the buildScan() implementing Spark's PrunedFilteredScan. + Spark SQL queries with columns or predicates specified will be pushed down + to us here, and we can pass that on to Phoenix. According to the docs, this + is an optimization, and the filtering/pruning will be re-evaluated again, + but this prevents having to load the whole table into Spark first. + */ + override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + new PhoenixRDD( + sqlContext.sparkContext, + tableName, + requiredColumns, + Some(buildFilter(filters)), + Some(zkUrl), + new Configuration() + ).toDataFrame(sqlContext).rdd + } + + // Required by BaseRelation, this will return the full schema for a table + override def schema: StructType = { + new PhoenixRDD( + sqlContext.sparkContext, + tableName, + Seq(), + None, + Some(zkUrl), + new Configuration() + ).toDataFrame(sqlContext).schema + } + + // Attempt to create Phoenix-accepted WHERE clauses from Spark filters, + // mostly inspired from Spark SQL JDBCRDD and the couchbase-spark-connector + private def buildFilter(filters: Array[Filter]): String = { + if (filters.isEmpty) { + return "" + } + + val filter = new StringBuilder("") + var i = 0 + + filters.foreach(f => { + if (i > 0) { + filter.append(" AND") + } + + f match { + case EqualTo(attr, value) => filter.append(s" $attr = ${compileValue(value)}") + case GreaterThan(attr, value) => filter.append(s" $attr > ${compileValue(value)}") + case GreaterThanOrEqual(attr, value) => filter.append(s" $attr >= ${compileValue(value)}") + case LessThan(attr, value) => filter.append(s" $attr < ${compileValue(value)}") + case LessThanOrEqual(attr, value) => filter.append(s" $attr <= ${compileValue(value)}") + case IsNull(attr) => filter.append(s" $attr IS NULL") + case IsNotNull(attr) => filter.append(s" $attr IS NOT NULL") + case _ => throw new Exception("Unsupported filter") + } + + i = i + 1 + }) + + filter.toString() + } + + // Helper function to escape string values in SQL queries + private def compileValue(value: Any): Any = value match { + case stringValue: String => s"'${escapeSql(stringValue)}'" + case _ => value + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala index 2926569..3d24fb9 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala @@ -27,27 +27,10 @@ class ProductRDDFunctions[A <: Product](data: RDD[A]) extends Logging with Seria conf: Configuration = new Configuration, zkUrl: Option[String] = None) : Unit = { - // Setup Phoenix output configuration, make a local copy - val config = new Configuration(conf) - PhoenixConfigurationUtil.setOutputTableName(config, tableName) - PhoenixConfigurationUtil.setUpsertColumnNames(config, cols.mkString(",")) - - // Override the Zookeeper URL if present. Throw exception if no address given. - zkUrl match { - case Some(url) => config.set(HConstants.ZOOKEEPER_QUORUM, url ) - case _ => { - if(config.get(HConstants.ZOOKEEPER_QUORUM) == null) { - throw new UnsupportedOperationException( - s"One of zkUrl or '${HConstants.ZOOKEEPER_QUORUM}' config property must be provided" - ) - } - } - } + val config = ConfigurationUtil.getOutputConfiguration(tableName, cols, zkUrl, Some(conf)) // Encode the column info to a serializable type - val encodedColumns = ColumnInfoToStringEncoderDecoder.encode( - PhoenixConfigurationUtil.getUpsertColumnMetadataList(config) - ) + val encodedColumns = ConfigurationUtil.encodeColumns(config) // Map each element of the product to a new (NullWritable, PhoenixRecordWritable) val phxRDD: RDD[(NullWritable, PhoenixRecordWritable)] = data.map { e => http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/main/scala/org/apache/phoenix/spark/package.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/package.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/package.scala index c19ec16..3fed79e 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/package.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/package.scala @@ -15,7 +15,7 @@ package org.apache.phoenix import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{DataFrame, SQLContext} package object spark { implicit def toProductRDDFunctions[A <: Product](rdd: RDD[A]): ProductRDDFunctions[A] = { @@ -29,4 +29,8 @@ package object spark { implicit def toSparkSqlContextFunctions(sqlContext: SQLContext): SparkSqlContextFunctions = { new SparkSqlContextFunctions(sqlContext) } + + implicit def toDataFrameFunctions(data: DataFrame): DataFrameFunctions = { + new DataFrameFunctions(data) + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index b81dfb5..977218d 100644 --- a/pom.xml +++ b/pom.xml @@ -428,6 +428,11 @@ <artifactId>phoenix-pig</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.phoenix</groupId> + <artifactId>phoenix-spark</artifactId> + <version>${project.version}</version> + </dependency> <!-- HBase dependencies --> <dependency>