Author: chinmayskulkarni Date: Tue Nov 26 00:55:06 2019 New Revision: 1870430
URL: http://svn.apache.org/viewvc?rev=1870430&view=rev Log: PHOENIX-5285: Add documentation for DataSourceV2 support in the phoenix-spark connector Modified: phoenix/site/publish/language/datatypes.html phoenix/site/publish/language/functions.html phoenix/site/publish/language/index.html phoenix/site/publish/phoenix_spark.html Modified: phoenix/site/publish/language/datatypes.html URL: http://svn.apache.org/viewvc/phoenix/site/publish/language/datatypes.html?rev=1870430&r1=1870429&r2=1870430&view=diff ============================================================================== --- phoenix/site/publish/language/datatypes.html (original) +++ phoenix/site/publish/language/datatypes.html Tue Nov 26 00:55:06 2019 @@ -1,7 +1,7 @@ <!DOCTYPE html> <!-- - Generated by Apache Maven Doxia at 2019-11-21 + Generated by Apache Maven Doxia at 2019-11-25 Rendered using Reflow Maven Skin 1.1.0 (http://andriusvelykis.github.io/reflow-maven-skin) --> <html xml:lang="en" lang="en"> Modified: phoenix/site/publish/language/functions.html URL: http://svn.apache.org/viewvc/phoenix/site/publish/language/functions.html?rev=1870430&r1=1870429&r2=1870430&view=diff ============================================================================== --- phoenix/site/publish/language/functions.html (original) +++ phoenix/site/publish/language/functions.html Tue Nov 26 00:55:06 2019 @@ -1,7 +1,7 @@ <!DOCTYPE html> <!-- - Generated by Apache Maven Doxia at 2019-11-21 + Generated by Apache Maven Doxia at 2019-11-25 Rendered using Reflow Maven Skin 1.1.0 (http://andriusvelykis.github.io/reflow-maven-skin) --> <html xml:lang="en" lang="en"> Modified: phoenix/site/publish/language/index.html URL: http://svn.apache.org/viewvc/phoenix/site/publish/language/index.html?rev=1870430&r1=1870429&r2=1870430&view=diff ============================================================================== --- phoenix/site/publish/language/index.html (original) +++ phoenix/site/publish/language/index.html Tue Nov 26 00:55:06 2019 @@ -1,7 +1,7 @@ <!DOCTYPE html> <!-- - Generated by Apache Maven Doxia at 2019-11-21 + Generated by Apache Maven Doxia at 2019-11-25 Rendered using Reflow Maven Skin 1.1.0 (http://andriusvelykis.github.io/reflow-maven-skin) --> <html xml:lang="en" lang="en"> Modified: phoenix/site/publish/phoenix_spark.html URL: http://svn.apache.org/viewvc/phoenix/site/publish/phoenix_spark.html?rev=1870430&r1=1870429&r2=1870430&view=diff ============================================================================== --- phoenix/site/publish/phoenix_spark.html (original) +++ phoenix/site/publish/phoenix_spark.html Tue Nov 26 00:55:06 2019 @@ -1,7 +1,7 @@ <!DOCTYPE html> <!-- - Generated by Apache Maven Doxia at 2019-11-21 + Generated by Apache Maven Doxia at 2019-11-25 Rendered using Reflow Maven Skin 1.1.0 (http://andriusvelykis.github.io/reflow-maven-skin) --> <html xml:lang="en" lang="en"> @@ -168,7 +168,7 @@ <div class="page-header"> <h1>Apache Spark Plugin</h1> </div> -<p>The phoenix-spark plugin extends Phoenixâs MapReduce support to allow Spark to load Phoenix tables as RDDs or DataFrames, and enables persisting them back to Phoenix.</p> +<p>The phoenix-spark plugin extends Phoenixâs MapReduce support to allow Spark to load Phoenix tables as DataFrames, and enables persisting them back to Phoenix.</p> <div class="section"> <div class="section"> <div class="section"> @@ -188,7 +188,8 @@ <h4 id="Spark_setup">Spark setup</h4> <ul> <li> <p>To ensure that all requisite Phoenix / HBase platform dependencies are available on the classpath for the Spark executors and drivers, set both â<i>spark.executor.extraClassPath</i>â and â<i>spark.driver.extraClassPath</i>â in spark-defaults.conf to include the âphoenix-<i><tt><version></tt></i>-client.jarâ</p></li> - <li> <p>Note that for Phoenix versions 4.7 and 4.8 you must use the âphoenix-<i><tt><version></tt></i>-client-spark.jarâ. As of Phoenix 4.10, the âphoenix-<i><tt><version></tt></i>-client.jarâ is compiled against Spark 2.x. If compability with Spark 1.x if needed, you must compile Phoenix with the <tt>spark16</tt> maven profile. </p></li> + <li> <p>Note that for Phoenix versions 4.7 and 4.8 you must use the âphoenix-<i><tt><version></tt></i>-client-spark.jarâ. </p></li> + <li> <p>As of Phoenix 4.10, the âphoenix-<i><tt><version></tt></i>-client.jarâ is compiled against Spark 2.x. If compability with Spark 1.x if needed, you must compile Phoenix with the <tt>spark16</tt> maven profile.</p></li> <li> <p>To help your IDE, you can add the following <i>provided</i> dependency to your build:</p></li> </ul> <div class="source"> @@ -200,11 +201,23 @@ </dependency> </pre> </div> + <ul> + <li>As of Phoenix 4.15.0, the connectors project will be separated from the main phoenix project (see <a class="externalLink" href="https://github.com/apache/phoenix-connectors">phoenix-connectors</a>) and will have its own releases. You can add the following dependency in your project:</li> + </ul> + <div class="source"> + <pre><dependency> + <groupId>org.apache.phoenix</groupId> + <artifactId>phoenix-spark</artifactId> + <version>${phoenix.connectors.version}</version> +</dependency> +</pre> + </div> + <p>The first released connectors jar is <tt>connectors-1.0.0</tt> (replace above <tt>phoenix.connectors.version</tt> with this version)</p> </div> </div> <div class="section"> <h3 id="Reading_Phoenix_Tables">Reading Phoenix Tables</h3> - <p>Given a Phoenix table with the following DDL</p> + <p>Given a Phoenix table with the following DDL and DML:</p> <div class="source"> <pre>CREATE TABLE TABLE1 (ID BIGINT NOT NULL PRIMARY KEY, COL1 VARCHAR); UPSERT INTO TABLE1 (ID, COL1) VALUES (1, 'test_row_1'); @@ -212,131 +225,112 @@ UPSERT INTO TABLE1 (ID, COL1) VALUES (2, </pre> </div> <div class="section"> - <h4 id="Load_as_a_DataFrame_using_the_Data_Source_API">Load as a DataFrame using the Data Source API</h4> + <h4 id="Load_as_a_DataFrame_using_the_DataSourceV2_API">Load as a DataFrame using the DataSourceV2 API</h4> <div class="source"> <pre>import org.apache.spark.SparkContext -import org.apache.spark.sql.SQLContext -import org.apache.phoenix.spark._ +import org.apache.spark.sql.{SQLContext, SparkSession} +import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource -val sc = new SparkContext("local", "phoenix-test") -val sqlContext = new SQLContext(sc) +val spark = SparkSession + .builder() + .appName("phoenix-test") + .master("local") + .getOrCreate() + +// Load data from TABLE1 +val df = spark.sqlContext + .read + .format("phoenix") + .options(Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181")) + .load -val df = sqlContext.load( - "org.apache.phoenix.spark", - Map("table" -> "TABLE1", "zkUrl" -> "phoenix-server:2181") -) - -df - .filter(df("COL1") === "test_row_1" && df("ID") === 1L) +df.filter(df("COL1") === "test_row_1" && df("ID") === 1L) .select(df("ID")) .show </pre> </div> </div> + </div> + <div class="section"> + <h3 id="Saving_to_Phoenix">Saving to Phoenix</h3> <div class="section"> - <h4 id="Load_as_a_DataFrame_directly_using_a_Configuration_object">Load as a DataFrame directly using a Configuration object</h4> - <div class="source"> - <pre>import org.apache.hadoop.conf.Configuration -import org.apache.spark.SparkContext -import org.apache.spark.sql.SQLContext -import org.apache.phoenix.spark._ - -val configuration = new Configuration() -// Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum' - -val sc = new SparkContext("local", "phoenix-test") -val sqlContext = new SQLContext(sc) - -// Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame -val df = sqlContext.phoenixTableAsDataFrame( - "TABLE1", Array("ID", "COL1"), conf = configuration -) - -df.show -</pre> - </div> - </div> - <div class="section"> - <h4 id="Load_as_an_RDD_using_a_Zookeeper_URL">Load as an RDD, using a Zookeeper URL</h4> + <h4 id="Save_DataFrames_to_Phoenix_using_DataSourceV2">Save DataFrames to Phoenix using DataSourceV2</h4> + <p>The <tt>save</tt> is method on DataFrame allows passing in a data source type. You can use <tt>phoenix</tt> for DataSourceV2 and must also pass in a <tt>table</tt> and <tt>zkUrl</tt> parameter to specify which table and server to persist the DataFrame to. The column names are derived from the DataFrameâs schema field names, and must match the Phoenix column names.</p> + <p>The <tt>save</tt> method also takes a <tt>SaveMode</tt> option, for which only <tt>SaveMode.Overwrite</tt> is supported.</p> + <p>Given two Phoenix tables with the following DDL:</p> <div class="source"> - <pre>import org.apache.spark.SparkContext -import org.apache.spark.sql.SQLContext -import org.apache.phoenix.spark._ - -val sc = new SparkContext("local", "phoenix-test") - -// Load the columns 'ID' and 'COL1' from TABLE1 as an RDD -val rdd: RDD[Map[String, AnyRef]] = sc.phoenixTableAsRDD( - "TABLE1", Seq("ID", "COL1"), zkUrl = Some("phoenix-server:2181") -) - -rdd.count() - -val firstId = rdd1.first()("ID").asInstanceOf[Long] -val firstCol = rdd1.first()("COL1").asInstanceOf[String] + <pre>CREATE TABLE INPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); +CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); </pre> </div> - </div> - </div> - <div class="section"> - <h3 id="Saving_Phoenix">Saving Phoenix</h3> - <p>Given a Phoenix table with the following DDL</p> - <div class="source"> - <pre>CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); -</pre> - </div> - <div class="section"> - <h4 id="Saving_RDDs">Saving RDDs</h4> - <p>The <tt>saveToPhoenix</tt> method is an implicit method on RDD[Product], or an RDD of Tuples. The data types must correspond to one of <a href="language/datatypes.html">the Java types supported by Phoenix</a>.</p> + <p>you can load from an input table and save to an output table as a DataFrame as follows:</p> <div class="source"> <pre>import org.apache.spark.SparkContext -import org.apache.phoenix.spark._ +import org.apache.spark.sql.{SQLContext, SparkSession, SaveMode} +import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource -val sc = new SparkContext("local", "phoenix-test") -val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3)) +val spark = SparkSession + .builder() + .appName("phoenix-test") + .master("local") + .getOrCreate() + +// Load INPUT_TABLE +val df = spark.sqlContext + .read + .format("phoenix") + .options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "hbaseConnectionString")) + .load -sc - .parallelize(dataSet) - .saveToPhoenix( - "OUTPUT_TEST_TABLE", - Seq("ID","COL1","COL2"), - zkUrl = Some("phoenix-server:2181") - ) +// Save to OUTPUT_TABLE +df + .write + .format("phoenix") + .mode(SaveMode.Overwrite) + .options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "hbaseConnectionString")) + .save() </pre> </div> </div> <div class="section"> - <h4 id="Saving_DataFrames">Saving DataFrames</h4> - <p>The <tt>save</tt> is method on DataFrame allows passing in a data source type. You can use <tt>org.apache.phoenix.spark</tt>, and must also pass in a <tt>table</tt> and <tt>zkUrl</tt> parameter to specify which table and server to persist the DataFrame to. The column names are derived from the DataFrameâs schema field names, and must match the Phoenix column names.</p> - <p>The <tt>save</tt> method also takes a <tt>SaveMode</tt> option, for which only <tt>SaveMode.Overwrite</tt> is supported.</p> - <p>Given two Phoenix tables with the following DDL:</p> + <h4 id="Save_from_an_external_RDD_with_a_schema_to_a_Phoenix_table">Save from an external RDD with a schema to a Phoenix table</h4> + <p>Just like the previous example, you can pass in the data source type as <tt>phoenix</tt> and specify the <tt>table</tt> and <tt>zkUrl</tt> parameters indicating which table and server to persist the DataFrame to.</p> + <p>Note that the schema of the RDD must match its column data and this must match the schema of the Phoenix table that you save to.</p> + <p>Given an output Phoenix table with the following DDL:</p> <div class="source"> - <pre>CREATE TABLE INPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); -CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); + <pre>CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); </pre> </div> + <p>you can save a dataframe from an RDD as follows: </p> <div class="source"> <pre>import org.apache.spark.SparkContext -import org.apache.spark.sql._ -import org.apache.phoenix.spark._ - -// Load INPUT_TABLE -val sc = new SparkContext("local", "phoenix-test") -val sqlContext = new SQLContext(sc) -val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "INPUT_TABLE", - "zkUrl" -> hbaseConnectionString)) - -// Save to OUTPUT_TABLE -df.saveToPhoenix(Map("table" -> "OUTPUT_TABLE", "zkUrl" -> hbaseConnectionString)) - -or - -df.write \ - .format("org.apache.phoenix.spark") \ - .mode("overwrite") \ - .option("table", "OUTPUT_TABLE") \ - .option("zkUrl", "localhost:2181") \ - .save() +import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, StructField} +import org.apache.spark.sql.{Row, SQLContext, SparkSession, SaveMode} +import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource + +val spark = SparkSession + .builder() + .appName("phoenix-test") + .master("local") + .getOrCreate() + +val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3)) + +val schema = StructType( + Seq(StructField("ID", LongType, nullable = false), + StructField("COL1", StringType), + StructField("COL2", IntegerType))) + +val rowRDD = spark.sparkContext.parallelize(dataSet) + +// Apply the schema to the RDD. +val df = spark.sqlContext.createDataFrame(rowRDD, schema) + +df.write + .format("phoenix") + .options(Map("table" -> "OUTPUT_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "quorumAddress")) + .mode(SaveMode.Overwrite) + .save() </pre> </div> </div> @@ -349,7 +343,7 @@ df.write \ <p>Given a table <i>TABLE1</i> and a Zookeeper url of <tt>localhost:2181</tt> you can load the table as a DataFrame using the following Python code in <tt>pyspark</tt></p> <div class="source"> <pre>df = sqlContext.read \ - .format("org.apache.phoenix.spark") \ + .format("phoenix") \ .option("table", "TABLE1") \ .option("zkUrl", "localhost:2181") \ .load() @@ -361,7 +355,7 @@ df.write \ <p>Given the same table and Zookeeper URLs above, you can save a DataFrame to a Phoenix table using the following code</p> <div class="source"> <pre>df.write \ - .format("org.apache.phoenix.spark") \ + .format("phoenix") \ .mode("overwrite") \ .option("table", "TABLE1") \ .option("zkUrl", "localhost:2181") \ @@ -372,8 +366,11 @@ df.write \ </div> <div class="section"> <h3 id="Notes">Notes</h3> - <p>The functions <tt>phoenixTableAsDataFrame</tt>, <tt>phoenixTableAsRDD</tt> and <tt>saveToPhoenix</tt> all support optionally specifying a <tt>conf</tt> Hadoop configuration parameter with custom Phoenix client settings, as well as an optional <tt>zkUrl</tt> parameter for the Phoenix connection URL.</p> - <p>If <tt>zkUrl</tt> isnât specified, itâs assumed that the âhbase.zookeeper.quorumâ property has been set in the <tt>conf</tt> parameter. Similarly, if no configuration is passed in, <tt>zkUrl</tt> must be specified.</p> + <ul> + <li>If you want to use DataSourceV1, you can use source type <tt>"org.apache.phoenix.spark"</tt> instead of <tt>"phoenix"</tt>, however this is deprecated as of <tt>connectors-1.0.0</tt>.</li> + <li>The (deprecated) functions <tt>phoenixTableAsDataFrame</tt>, <tt>phoenixTableAsRDD</tt> and <tt>saveToPhoenix</tt> all support optionally specifying a <tt>conf</tt> Hadoop configuration parameter with custom Phoenix client settings, as well as an optional <tt>zkUrl</tt> parameter for the Phoenix connection URL.</li> + <li>If <tt>zkUrl</tt> isnât specified, itâs assumed that the âhbase.zookeeper.quorumâ property has been set in the <tt>conf</tt> parameter. Similarly, if no configuration is passed in, <tt>zkUrl</tt> must be specified.</li> + </ul> </div> <div class="section"> <h3 id="Limitations">Limitations</h3> @@ -428,6 +425,80 @@ pr.vertices.saveToPhoenix("EMAIL_EN </pre> </div></li> </ol> + <hr /> + </div> + <div class="section"> + <h3 id="Deprecated_Usages">Deprecated Usages</h3> + <div class="section"> + <h4 id="Load_as_a_DataFrame_directly_using_a_Configuration_object">Load as a DataFrame directly using a Configuration object</h4> + <div class="source"> + <pre>import org.apache.hadoop.conf.Configuration +import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext +import org.apache.phoenix.spark._ + +val configuration = new Configuration() +// Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum' + +val sc = new SparkContext("local", "phoenix-test") +val sqlContext = new SQLContext(sc) + +// Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame +val df = sqlContext.phoenixTableAsDataFrame( + "TABLE1", Array("ID", "COL1"), conf = configuration +) + +df.show +</pre> + </div> + </div> + <div class="section"> + <h4 id="Load_as_an_RDD_using_a_Zookeeper_URL">Load as an RDD, using a Zookeeper URL</h4> + <div class="source"> + <pre>import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext +import org.apache.phoenix.spark._ +import org.apache.spark.rdd.RDD + +val sc = new SparkContext("local", "phoenix-test") + +// Load the columns 'ID' and 'COL1' from TABLE1 as an RDD +val rdd: RDD[Map[String, AnyRef]] = sc.phoenixTableAsRDD( + "TABLE1", Seq("ID", "COL1"), zkUrl = Some("phoenix-server:2181") +) + +rdd.count() + +val firstId = rdd.first()("ID").asInstanceOf[Long] +val firstCol = rdd.first()("COL1").asInstanceOf[String] +</pre> + </div> + </div> + <div class="section"> + <h4 id="Saving_RDDs_to_Phoenix">Saving RDDs to Phoenix</h4> + <p><tt>saveToPhoenix</tt> is an implicit method on RDD[Product], or an RDD of Tuples. The data types must correspond to the Java types Phoenix supports (<a class="externalLink" href="http://phoenix.apache.org/language/datatypes.html">http://phoenix.apache.org/language/datatypes.html</a>)</p> + <p>Given a Phoenix table with the following DDL:</p> + <div class="source"> + <pre>CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); +</pre> + </div> + <div class="source"> + <pre>import org.apache.spark.SparkContext +import org.apache.phoenix.spark._ + +val sc = new SparkContext("local", "phoenix-test") +val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3)) + +sc + .parallelize(dataSet) + .saveToPhoenix( + "OUTPUT_TEST_TABLE", + Seq("ID","COL1","COL2"), + zkUrl = Some("phoenix-server:2181") + ) +</pre> + </div> + </div> </div> </div> </div>