Author: chinmayskulkarni Date: Wed Nov 27 03:15:21 2019 New Revision: 1870483
URL: http://svn.apache.org/viewvc?rev=1870483&view=rev Log: PHOENIX-5585: Add documentation for Phoenix-Spark Java example Modified: phoenix/site/publish/phoenix_spark.html phoenix/site/source/src/site/markdown/phoenix_spark.md Modified: phoenix/site/publish/phoenix_spark.html URL: http://svn.apache.org/viewvc/phoenix/site/publish/phoenix_spark.html?rev=1870483&r1=1870482&r2=1870483&view=diff ============================================================================== --- phoenix/site/publish/phoenix_spark.html (original) +++ phoenix/site/publish/phoenix_spark.html Wed Nov 27 03:15:21 2019 @@ -226,6 +226,7 @@ UPSERT INTO TABLE1 (ID, COL1) VALUES (2, </div> <div class="section"> <h4 id="Load_as_a_DataFrame_using_the_DataSourceV2_API">Load as a DataFrame using the DataSourceV2 API</h4> + <p>Scala example: </p> <div class="source"> <pre>import org.apache.spark.SparkContext import org.apache.spark.sql.{SQLContext, SparkSession} @@ -249,6 +250,40 @@ df.filter(df("COL1") === " .show </pre> </div> + <p>Java example: </p> + <div class="source"> + <pre>import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; + +import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL; + +public class PhoenixSparkRead { + + public static void main() throws Exception { + SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test"); + JavaSparkContext jsc = new JavaSparkContext(sparkConf); + SQLContext sqlContext = new SQLContext(jsc); + + // Load data from TABLE1 + Dataset<Row> df = sqlContext + .read() + .format("phoenix") + .option("table", "TABLE1") + .option(ZOOKEEPER_URL, "phoenix-server:2181") + .load(); + df.createOrReplaceTempView("TABLE1"); + + SQLContext sqlCtx = new SQLContext(jsc); + df = sqlCtx.sql("SELECT * FROM TABLE1 WHERE COL1='test_row_1' AND ID=1L"); + df.show(); + jsc.stop(); + } +} +</pre> + </div> </div> </div> <div class="section"> @@ -263,7 +298,7 @@ df.filter(df("COL1") === " CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); </pre> </div> - <p>you can load from an input table and save to an output table as a DataFrame as follows:</p> + <p>you can load from an input table and save to an output table as a DataFrame as follows in Scala:</p> <div class="source"> <pre>import org.apache.spark.SparkContext import org.apache.spark.sql.{SQLContext, SparkSession, SaveMode} @@ -279,18 +314,55 @@ val spark = SparkSession val df = spark.sqlContext .read .format("phoenix") - .options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "hbaseConnectionString")) + .options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181")) .load // Save to OUTPUT_TABLE -df - .write +df.write .format("phoenix") .mode(SaveMode.Overwrite) - .options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "hbaseConnectionString")) + .options(Map("table" -> "OUTPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181")) .save() </pre> </div> + <p>Java example: </p> + <div class="source"> + <pre>import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SQLContext; + +import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL; + +public class PhoenixSparkWriteFromInputTable { + + public static void main() throws Exception { + SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test"); + JavaSparkContext jsc = new JavaSparkContext(sparkConf); + SQLContext sqlContext = new SQLContext(jsc); + + // Load INPUT_TABLE + Dataset<Row> df = sqlContext + .read() + .format("phoenix") + .option("table", "INPUT_TABLE") + .option(ZOOKEEPER_URL, "phoenix-server:2181") + .load(); + + // Save to OUTPUT_TABLE + df.write() + .format("phoenix") + .mode(SaveMode.Overwrite) + .option("table", "OUTPUT_TABLE") + .option(ZOOKEEPER_URL, "phoenix-server:2181") + .save(); + jsc.stop(); + } +} +</pre> + </div> </div> <div class="section"> <h4 id="Save_from_an_external_RDD_with_a_schema_to_a_Phoenix_table">Save from an external RDD with a schema to a Phoenix table</h4> @@ -301,7 +373,7 @@ df <pre>CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); </pre> </div> - <p>you can save a dataframe from an RDD as follows: </p> + <p>you can save a dataframe from an RDD as follows in Scala: </p> <div class="source"> <pre>import org.apache.spark.SparkContext import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, StructField} @@ -328,11 +400,66 @@ val df = spark.sqlContext.createDataFram df.write .format("phoenix") - .options(Map("table" -> "OUTPUT_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "quorumAddress")) + .options(Map("table" -> "OUTPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181")) .mode(SaveMode.Overwrite) .save() </pre> </div> + <p>Java example: </p> + <div class="source"> + <pre>import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL; + +public class PhoenixSparkWriteFromRDDWithSchema { + + public static void main() throws Exception { + SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test"); + JavaSparkContext jsc = new JavaSparkContext(sparkConf); + SQLContext sqlContext = new SQLContext(jsc); + SparkSession spark = sqlContext.sparkSession(); + Dataset<Row> df; + + // Generate the schema based on the fields + List<StructField> fields = new ArrayList<>(); + fields.add(DataTypes.createStructField("ID", DataTypes.LongType, false)); + fields.add(DataTypes.createStructField("COL1", DataTypes.StringType, true)); + fields.add(DataTypes.createStructField("COL2", DataTypes.IntegerType, true)); + StructType schema = DataTypes.createStructType(fields); + + // Generate the rows with the same exact schema + List<Row> rows = new ArrayList<>(); + for (int i = 1; i < 4; i++) { + rows.add(RowFactory.create(Long.valueOf(i), String.valueOf(i), i)); + } + + // Create a DataFrame from the rows and the specified schema + df = spark.createDataFrame(rows, schema); + df.write() + .format("phoenix") + .mode(SaveMode.Overwrite) + .option("table", "OUTPUT_TABLE") + .option(ZOOKEEPER_URL, "phoenix-server:2181") + .save(); + + jsc.stop(); + } +} +</pre> + </div> </div> </div> <div class="section"> @@ -340,12 +467,12 @@ df.write <p>With Sparkâs DataFrame support, you can also use <tt>pyspark</tt> to read and write from Phoenix tables.</p> <div class="section"> <h4 id="Load_a_DataFrame">Load a DataFrame</h4> - <p>Given a table <i>TABLE1</i> and a Zookeeper url of <tt>localhost:2181</tt> you can load the table as a DataFrame using the following Python code in <tt>pyspark</tt></p> + <p>Given a table <i>TABLE1</i> and a Zookeeper url of <tt>phoenix-server:2181</tt> you can load the table as a DataFrame using the following Python code in <tt>pyspark</tt></p> <div class="source"> <pre>df = sqlContext.read \ .format("phoenix") \ .option("table", "TABLE1") \ - .option("zkUrl", "localhost:2181") \ + .option("zkUrl", "phoenix-server:2181") \ .load() </pre> </div> @@ -358,7 +485,7 @@ df.write .format("phoenix") \ .mode("overwrite") \ .option("table", "TABLE1") \ - .option("zkUrl", "localhost:2181") \ + .option("zkUrl", "phoenix-server:2181") \ .save() </pre> </div> @@ -377,7 +504,7 @@ df.write .sqlContext .read .format("phoenix") - .options(Map("table" -> "Table1", "zkUrl" -> "hosta,hostb,hostc", + .options(Map("table" -> "Table1", "zkUrl" -> "phoenix-server:2181", "phoenixConfigs" -> "hbase.client.retries.number=10,hbase.client.pause=10000")) .load; </pre> Modified: phoenix/site/source/src/site/markdown/phoenix_spark.md URL: http://svn.apache.org/viewvc/phoenix/site/source/src/site/markdown/phoenix_spark.md?rev=1870483&r1=1870482&r2=1870483&view=diff ============================================================================== --- phoenix/site/source/src/site/markdown/phoenix_spark.md (original) +++ phoenix/site/source/src/site/markdown/phoenix_spark.md Wed Nov 27 03:15:21 2019 @@ -1,7 +1,7 @@ # Apache Spark Plugin The phoenix-spark plugin extends Phoenix's MapReduce support to allow Spark to load Phoenix tables -as RDDs or DataFrames, and enables persisting them back to Phoenix. +as DataFrames, and enables persisting them back to Phoenix. #### Prerequisites @@ -28,8 +28,10 @@ The choice of which method to use to acc for the Spark executors and drivers, set both '_spark.executor.extraClassPath_' and '_spark.driver.extraClassPath_' in spark-defaults.conf to include the 'phoenix-_`<version>`_-client.jar' -* Note that for Phoenix versions 4.7 and 4.8 you must use the 'phoenix-_`<version>`_-client-spark.jar'. As of Phoenix 4.10, the 'phoenix-_`<version>`_-client.jar' is compiled against Spark 2.x. If compability with Spark 1.x if needed, you must compile Phoenix with the `spark16` maven profile. - +* Note that for Phoenix versions 4.7 and 4.8 you must use the 'phoenix-_`<version>`_-client-spark.jar'. + +* As of Phoenix 4.10, the 'phoenix-_`<version>`_-client.jar' is compiled against Spark 2.x. If compability with Spark 1.x if needed, you must compile Phoenix with the `spark16` maven profile. + * To help your IDE, you can add the following _provided_ dependency to your build: ``` @@ -41,9 +43,21 @@ for the Spark executors and drivers, set </dependency> ``` +* As of Phoenix 4.15.0, the connectors project will be separated from the main phoenix project (see [phoenix-connectors](https://github.com/apache/phoenix-connectors)) +and will have its own releases. You can add the following dependency in your project: + +``` +<dependency> + <groupId>org.apache.phoenix</groupId> + <artifactId>phoenix-spark</artifactId> + <version>${phoenix.connectors.version}</version> +</dependency> +``` +The first released connectors jar is `connectors-1.0.0` (replace above `phoenix.connectors.version` with this version) + ### Reading Phoenix Tables -Given a Phoenix table with the following DDL +Given a Phoenix table with the following DDL and DML: ```sql CREATE TABLE TABLE1 (ID BIGINT NOT NULL PRIMARY KEY, COL1 VARCHAR); @@ -51,134 +65,251 @@ UPSERT INTO TABLE1 (ID, COL1) VALUES (1, UPSERT INTO TABLE1 (ID, COL1) VALUES (2, 'test_row_2'); ``` -#### Load as a DataFrame using the Data Source API +#### Load as a DataFrame using the DataSourceV2 API +Scala example: + ```scala import org.apache.spark.SparkContext -import org.apache.spark.sql.SQLContext -import org.apache.phoenix.spark._ - -val sc = new SparkContext("local", "phoenix-test") -val sqlContext = new SQLContext(sc) +import org.apache.spark.sql.{SQLContext, SparkSession} +import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource -val df = sqlContext.load( - "org.apache.phoenix.spark", - Map("table" -> "TABLE1", "zkUrl" -> "phoenix-server:2181") -) +val spark = SparkSession + .builder() + .appName("phoenix-test") + .master("local") + .getOrCreate() + +// Load data from TABLE1 +val df = spark.sqlContext + .read + .format("phoenix") + .options(Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181")) + .load -df - .filter(df("COL1") === "test_row_1" && df("ID") === 1L) +df.filter(df("COL1") === "test_row_1" && df("ID") === 1L) .select(df("ID")) .show ``` -#### Load as a DataFrame directly using a Configuration object -```scala -import org.apache.hadoop.conf.Configuration -import org.apache.spark.SparkContext -import org.apache.spark.sql.SQLContext -import org.apache.phoenix.spark._ - -val configuration = new Configuration() -// Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum' - -val sc = new SparkContext("local", "phoenix-test") -val sqlContext = new SQLContext(sc) - -// Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame -val df = sqlContext.phoenixTableAsDataFrame( - "TABLE1", Array("ID", "COL1"), conf = configuration -) +Java example: -df.show +```java +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; + +import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL; + +public class PhoenixSparkRead { + + public static void main() throws Exception { + SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test"); + JavaSparkContext jsc = new JavaSparkContext(sparkConf); + SQLContext sqlContext = new SQLContext(jsc); + + // Load data from TABLE1 + Dataset<Row> df = sqlContext + .read() + .format("phoenix") + .option("table", "TABLE1") + .option(ZOOKEEPER_URL, "phoenix-server:2181") + .load(); + df.createOrReplaceTempView("TABLE1"); + + SQLContext sqlCtx = new SQLContext(jsc); + df = sqlCtx.sql("SELECT * FROM TABLE1 WHERE COL1='test_row_1' AND ID=1L"); + df.show(); + jsc.stop(); + } +} ``` -#### Load as an RDD, using a Zookeeper URL -```scala -import org.apache.spark.SparkContext -import org.apache.spark.sql.SQLContext -import org.apache.phoenix.spark._ - -val sc = new SparkContext("local", "phoenix-test") - -// Load the columns 'ID' and 'COL1' from TABLE1 as an RDD -val rdd: RDD[Map[String, AnyRef]] = sc.phoenixTableAsRDD( - "TABLE1", Seq("ID", "COL1"), zkUrl = Some("phoenix-server:2181") -) +### Saving to Phoenix -rdd.count() +#### Save DataFrames to Phoenix using DataSourceV2 -val firstId = rdd1.first()("ID").asInstanceOf[Long] -val firstCol = rdd1.first()("COL1").asInstanceOf[String] -``` +The `save` is method on DataFrame allows passing in a data source type. You can use +`phoenix` for DataSourceV2 and must also pass in a `table` and `zkUrl` parameter to +specify which table and server to persist the DataFrame to. The column names are derived from +the DataFrame's schema field names, and must match the Phoenix column names. -### Saving Phoenix +The `save` method also takes a `SaveMode` option, for which only `SaveMode.Overwrite` is supported. -Given a Phoenix table with the following DDL +Given two Phoenix tables with the following DDL: ```sql -CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); +CREATE TABLE INPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); +CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); ``` - -#### Saving RDDs - -The `saveToPhoenix` method is an implicit method on RDD[Product], or an RDD of Tuples. The data types must -correspond to one of [the Java types supported by Phoenix](language/datatypes.html). - +you can load from an input table and save to an output table as a DataFrame as follows in Scala: ```scala import org.apache.spark.SparkContext -import org.apache.phoenix.spark._ +import org.apache.spark.sql.{SQLContext, SparkSession, SaveMode} +import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource -val sc = new SparkContext("local", "phoenix-test") -val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3)) +val spark = SparkSession + .builder() + .appName("phoenix-test") + .master("local") + .getOrCreate() + +// Load INPUT_TABLE +val df = spark.sqlContext + .read + .format("phoenix") + .options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181")) + .load -sc - .parallelize(dataSet) - .saveToPhoenix( - "OUTPUT_TEST_TABLE", - Seq("ID","COL1","COL2"), - zkUrl = Some("phoenix-server:2181") - ) +// Save to OUTPUT_TABLE +df.write + .format("phoenix") + .mode(SaveMode.Overwrite) + .options(Map("table" -> "OUTPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181")) + .save() ``` -#### Saving DataFrames +Java example: -The `save` is method on DataFrame allows passing in a data source type. You can use -`org.apache.phoenix.spark`, and must also pass in a `table` and `zkUrl` parameter to -specify which table and server to persist the DataFrame to. The column names are derived from -the DataFrame's schema field names, and must match the Phoenix column names. +```java +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SQLContext; + +import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL; + +public class PhoenixSparkWriteFromInputTable { + + public static void main() throws Exception { + SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test"); + JavaSparkContext jsc = new JavaSparkContext(sparkConf); + SQLContext sqlContext = new SQLContext(jsc); + + // Load INPUT_TABLE + Dataset<Row> df = sqlContext + .read() + .format("phoenix") + .option("table", "INPUT_TABLE") + .option(ZOOKEEPER_URL, "phoenix-server:2181") + .load(); + + // Save to OUTPUT_TABLE + df.write() + .format("phoenix") + .mode(SaveMode.Overwrite) + .option("table", "OUTPUT_TABLE") + .option(ZOOKEEPER_URL, "phoenix-server:2181") + .save(); + jsc.stop(); + } +} +``` + +#### Save from an external RDD with a schema to a Phoenix table -The `save` method also takes a `SaveMode` option, for which only `SaveMode.Overwrite` is supported. +Just like the previous example, you can pass in the data source type as `phoenix` and specify the `table` and +`zkUrl` parameters indicating which table and server to persist the DataFrame to. -Given two Phoenix tables with the following DDL: +Note that the schema of the RDD must match its column data and this must match the schema of the Phoenix table +that you save to. + +Given an output Phoenix table with the following DDL: ```sql -CREATE TABLE INPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); ``` +you can save a dataframe from an RDD as follows in Scala: ```scala import org.apache.spark.SparkContext -import org.apache.spark.sql._ -import org.apache.phoenix.spark._ - -// Load INPUT_TABLE -val sc = new SparkContext("local", "phoenix-test") -val sqlContext = new SQLContext(sc) -val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "INPUT_TABLE", - "zkUrl" -> hbaseConnectionString)) - -// Save to OUTPUT_TABLE -df.saveToPhoenix(Map("table" -> "OUTPUT_TABLE", "zkUrl" -> hbaseConnectionString)) +import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, StructField} +import org.apache.spark.sql.{Row, SQLContext, SparkSession, SaveMode} +import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource + +val spark = SparkSession + .builder() + .appName("phoenix-test") + .master("local") + .getOrCreate() + +val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3)) + +val schema = StructType( + Seq(StructField("ID", LongType, nullable = false), + StructField("COL1", StringType), + StructField("COL2", IntegerType))) + +val rowRDD = spark.sparkContext.parallelize(dataSet) + +// Apply the schema to the RDD. +val df = spark.sqlContext.createDataFrame(rowRDD, schema) + +df.write + .format("phoenix") + .options(Map("table" -> "OUTPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181")) + .mode(SaveMode.Overwrite) + .save() +``` -or +Java example: -df.write \ - .format("org.apache.phoenix.spark") \ - .mode("overwrite") \ - .option("table", "OUTPUT_TABLE") \ - .option("zkUrl", "localhost:2181") \ - .save() +```java +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL; + +public class PhoenixSparkWriteFromRDDWithSchema { + + public static void main() throws Exception { + SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test"); + JavaSparkContext jsc = new JavaSparkContext(sparkConf); + SQLContext sqlContext = new SQLContext(jsc); + SparkSession spark = sqlContext.sparkSession(); + Dataset<Row> df; + + // Generate the schema based on the fields + List<StructField> fields = new ArrayList<>(); + fields.add(DataTypes.createStructField("ID", DataTypes.LongType, false)); + fields.add(DataTypes.createStructField("COL1", DataTypes.StringType, true)); + fields.add(DataTypes.createStructField("COL2", DataTypes.IntegerType, true)); + StructType schema = DataTypes.createStructType(fields); + + // Generate the rows with the same exact schema + List<Row> rows = new ArrayList<>(); + for (int i = 1; i < 4; i++) { + rows.add(RowFactory.create(Long.valueOf(i), String.valueOf(i), i)); + } + + // Create a DataFrame from the rows and the specified schema + df = spark.createDataFrame(rows, schema); + df.write() + .format("phoenix") + .mode(SaveMode.Overwrite) + .option("table", "OUTPUT_TABLE") + .option(ZOOKEEPER_URL, "phoenix-server:2181") + .save(); + + jsc.stop(); + } +} ``` ### PySpark @@ -187,14 +318,14 @@ With Spark's DataFrame support, you can #### Load a DataFrame -Given a table _TABLE1_ and a Zookeeper url of `localhost:2181` you can load the table as a +Given a table _TABLE1_ and a Zookeeper url of `phoenix-server:2181` you can load the table as a DataFrame using the following Python code in `pyspark` ```python df = sqlContext.read \ - .format("org.apache.phoenix.spark") \ + .format("phoenix") \ .option("table", "TABLE1") \ - .option("zkUrl", "localhost:2181") \ + .option("zkUrl", "phoenix-server:2181") \ .load() ``` @@ -205,22 +336,38 @@ using the following code ```python df.write \ - .format("org.apache.phoenix.spark") \ + .format("phoenix") \ .mode("overwrite") \ .option("table", "TABLE1") \ - .option("zkUrl", "localhost:2181") \ + .option("zkUrl", "phoenix-server:2181") \ .save() ``` ### Notes -The functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and `saveToPhoenix` all support +- If you want to use DataSourceV1, you can use source type `"org.apache.phoenix.spark"` + instead of `"phoenix"`, however this is deprecated as of `connectors-1.0.0`. +- The (deprecated) functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and `saveToPhoenix` all support optionally specifying a `conf` Hadoop configuration parameter with custom Phoenix client settings, as well as an optional `zkUrl` parameter for the Phoenix connection URL. - -If `zkUrl` isn't specified, it's assumed that the "hbase.zookeeper.quorum" property has been set +- If `zkUrl` isn't specified, it's assumed that the "hbase.zookeeper.quorum" property has been set in the `conf` parameter. Similarly, if no configuration is passed in, `zkUrl` must be specified. +- As of [PHOENIX-5197](https://issues.apache.org/jira/browse/PHOENIX-5197), you can pass configurations from the driver +to executors as a comma-separated list against the key `phoenixConfigs` i.e (PhoenixDataSource.PHOENIX_CONFIGS), for ex: + +```scala +df = spark + .sqlContext + .read + .format("phoenix") + .options(Map("table" -> "Table1", "zkUrl" -> "phoenix-server:2181", + "phoenixConfigs" -> "hbase.client.retries.number=10,hbase.client.pause=10000")) + .load; +``` +This list of properties is parsed and populated into a properties map which is passed to `DriverManager.getConnection(connString, propsMap)`. +Note that the same property values will be used for both the driver and all executors and +these configurations are used each time a connection is made (both on the driver and executors). ### Limitations @@ -281,3 +428,74 @@ saves the results back to Phoenix. | 588 | 106.11840798585399 | +------------------------------------------+------------------------------------------+ ``` +*** + +### Deprecated Usages + +#### Load as a DataFrame directly using a Configuration object +```scala +import org.apache.hadoop.conf.Configuration +import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext +import org.apache.phoenix.spark._ + +val configuration = new Configuration() +// Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum' + +val sc = new SparkContext("local", "phoenix-test") +val sqlContext = new SQLContext(sc) + +// Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame +val df = sqlContext.phoenixTableAsDataFrame( + "TABLE1", Array("ID", "COL1"), conf = configuration +) + +df.show +``` + +#### Load as an RDD, using a Zookeeper URL +```scala +import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext +import org.apache.phoenix.spark._ +import org.apache.spark.rdd.RDD + +val sc = new SparkContext("local", "phoenix-test") + +// Load the columns 'ID' and 'COL1' from TABLE1 as an RDD +val rdd: RDD[Map[String, AnyRef]] = sc.phoenixTableAsRDD( + "TABLE1", Seq("ID", "COL1"), zkUrl = Some("phoenix-server:2181") +) + +rdd.count() + +val firstId = rdd.first()("ID").asInstanceOf[Long] +val firstCol = rdd.first()("COL1").asInstanceOf[String] +``` + +#### Saving RDDs to Phoenix + +`saveToPhoenix` is an implicit method on RDD[Product], or an RDD of Tuples. The data types must +correspond to the Java types Phoenix supports (http://phoenix.apache.org/language/datatypes.html) + +Given a Phoenix table with the following DDL: + +```sql +CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); +``` + +```scala +import org.apache.spark.SparkContext +import org.apache.phoenix.spark._ + +val sc = new SparkContext("local", "phoenix-test") +val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3)) + +sc + .parallelize(dataSet) + .saveToPhoenix( + "OUTPUT_TEST_TABLE", + Seq("ID","COL1","COL2"), + zkUrl = Some("phoenix-server:2181") + ) +```