stoty commented on code in PR #139:
URL: 
https://github.com/apache/phoenix-connectors/pull/139#discussion_r1734210681


##########
phoenix5-spark/README.md:
##########
@@ -45,7 +57,7 @@ val spark = SparkSession
 val df = spark.sqlContext
   .read
   .format("phoenix")
-  .options(Map("table" -> "TABLE1"))
+  .options(Map("table" -> "TABLE1", "jdbcUrl" -> "jdbc:phoenix:zkHost:zkport"))

Review Comment:
   Specifying an explicit jdbcURL is somewhat of an anti-pattern.
   Ideally, the connection info should be taken from hbase-site.xml



##########
phoenix5-spark/README.md:
##########
@@ -18,6 +18,18 @@ limitations under the License.
 phoenix-spark extends Phoenix's MapReduce support to allow Spark to load 
Phoenix tables as DataFrames,
 and enables persisting DataFrames back to Phoenix.
 
+## Configuration properties

Review Comment:
   Nice
   



##########
phoenix5-spark/src/it/resources/globalSetup.sql:
##########
@@ -60,7 +60,7 @@ UPSERT INTO "small" VALUES ('key1', 'foo', 10000)
 UPSERT INTO "small" VALUES ('key2', 'bar', 20000)
 UPSERT INTO "small" VALUES ('key3', 'xyz', 30000)
 
-CREATE TABLE MULTITENANT_TEST_TABLE (TENANT_ID VARCHAR NOT NULL, 
ORGANIZATION_ID VARCHAR, GLOBAL_COL1 VARCHAR  CONSTRAINT pk PRIMARY KEY 
(TENANT_ID, ORGANIZATION_ID)) MULTI_TENANT=true
+CREATE TABLE MULTITENANT_TEST_TABLE (TENANT_ID VARCHAR NOT NULL, 
ORGANIZATION_ID VARCHAR, GLOBAL_COL1 VARCHAR  CONSTRAINT pk PRIMARY KEY 
(TENANT_ID, ORGANIZATION_ID)) MULTI_TENANT=true, SALT_BUCKETS = 20

Review Comment:
   If we want to test salting, we should create a new table and new tests for 
that.



##########
phoenix5-spark/README.md:
##########
@@ -249,14 +307,13 @@ public class PhoenixSparkWriteFromRDDWithSchema {
         }
   
         // Create a DataFrame from the rows and the specified schema
-        df = spark.createDataFrame(rows, schema);
+        Dataset<Row> df = spark.createDataFrame(rows, schema);
         df.write()
             .format("phoenix")
             .mode(SaveMode.Overwrite)
             .option("table", "OUTPUT_TABLE")
+            .option("jdbcUrl", "jdbc:phoenix:zkHost:zkport")
             .save();
-  

Review Comment:
   Shouldn't we close the session ?
   While this is only an example, I think that it should teach proper resource 
management.



##########
phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java:
##########
@@ -33,21 +37,27 @@
 import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
 import org.apache.spark.sql.types.StructType;
+import scala.collection.immutable.Map;
 
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 
 /**
  * Implements the DataSourceV2 api to read and write from Phoenix tables
  */
-public class PhoenixDataSource  implements DataSourceV2,  ReadSupport, 
WriteSupport, DataSourceRegister {
+public class PhoenixDataSource implements DataSourceV2, ReadSupport, 
WriteSupport, DataSourceRegister, RelationProvider {
 
     private static final Logger logger = 
LoggerFactory.getLogger(PhoenixDataSource.class);
     public static final String SKIP_NORMALIZING_IDENTIFIER = 
"skipNormalizingIdentifier";
     @Deprecated
     public static final String ZOOKEEPER_URL = "zkUrl";
     public static final String JDBC_URL = "jdbcUrl";
     public static final String PHOENIX_CONFIGS = "phoenixconfigs";
+    public static final String TABLE = "table";
+    public static final String DATE_AS_TIME_STAMP = "dateAsTimestamp";
+    public static final String DO_NOT_MAP_COLUMN_FAMILY = 
"doNotMapColumnFamily";
+    public static final String TENANT_ID = "tenantId";

Review Comment:
   use PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID as value instead of 
repeating the string ?



##########
phoenix5-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala:
##########
@@ -14,14 +14,14 @@
 package org.apache.phoenix.spark
 
 import java.sql.DriverManager
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hbase.HConstants
 import org.apache.hadoop.io.NullWritable
 import org.apache.phoenix.jdbc.PhoenixDriver
 import org.apache.phoenix.mapreduce.PhoenixInputFormat
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
 import org.apache.phoenix.query.HBaseFactoryProvider
+import org.apache.phoenix.spark.datasource.v2.SparkSchemaUtil

Review Comment:
   looks like an unused import.



##########
phoenix5-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkDatasourceV1IT.scala:
##########
@@ -0,0 +1,807 @@
+/*
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+ */
+package org.apache.phoenix.spark
+
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
+import org.apache.phoenix.query.QueryServices
+import org.apache.phoenix.schema.types.{PSmallintArray, PVarchar}
+import 
org.apache.phoenix.spark.datasource.v2.reader.PhoenixTestingInputPartitionReader
+import 
org.apache.phoenix.spark.datasource.v2.writer.PhoenixTestingDataSourceWriter
+import org.apache.phoenix.spark.datasource.v2.{PhoenixDataSource, 
PhoenixTestingDataSource, SparkSchemaUtil}
+import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
+import org.apache.spark.SparkException
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{Row, SaveMode}
+
+import java.sql.DriverManager
+import java.util.Date
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Note: If running directly from an IDE, these are the recommended VM 
parameters:
+  * -Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+  */
+class PhoenixSparkDatasourceV1IT extends AbstractPhoenixSparkIT {

Review Comment:
   IIRC AbstractPhoenixSparkIT is only used by the V1 tests.
   If you rename the tests, it would make sense to also rename the parent class.



##########
phoenix5-spark/README.md:
##########
@@ -89,7 +147,7 @@ public class PhoenixSparkRead {
 ### Save DataFrames to Phoenix using DataSourceV2
 
 The `save` is method on DataFrame allows passing in a data source type. You 
can use
-`phoenix` for DataSourceV2 and must also pass in a `table` and `zkUrl` 
parameter to
+`phoenix` for DataSourceV2 and must also pass in a `table` and `jdbcUrl` 
parameter to

Review Comment:
   This is not true anymore (it was already wrong, but since you're updating 
the docs, let's fix this).
   jdbcUrl is not mandatory.



##########
phoenix5-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkDatasourceV1ITTenantSpecific.scala:
##########
@@ -105,39 +102,6 @@ class PhoenixSparkITTenantSpecific extends 
AbstractPhoenixSparkIT {
     verifyResults
   }
 
-  ignore("Can write a DataFrame using 'DataFrame.write' to tenant-specific 
view - Spark2 sparse columns") {

Review Comment:
   I am not sure why this is ignored, but I assume that there is a bug that 
this would catch.
   It'd be better keep this.



##########
phoenix5-spark/README.md:
##########
@@ -141,32 +201,32 @@ public class PhoenixSparkWriteFromInputTable {
     
     public static void main() throws Exception {
         SparkConf sparkConf = new 
SparkConf().setMaster("local").setAppName("phoenix-test")
-          .set("spark.hadoopRDD.ignoreEmptySplits", "false");
-        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
-        SQLContext sqlContext = new SQLContext(jsc);
+            .set("spark.hadoopRDD.ignoreEmptySplits", "false");
+        SparkSessinon spark = 
SparkSession.builder().config(sparkConf).getOrCreate();
         
         // Load INPUT_TABLE
-        Dataset<Row> df = sqlContext
+        Dataset<Row> df = spark
             .read()
             .format("phoenix")
             .option("table", "INPUT_TABLE")
+            .option("jdbcUrl", "jdbc:phoenix:zkHost:zkport")
             .load();
         
         // Save to OUTPUT_TABLE
         df.write()
           .format("phoenix")
           .mode(SaveMode.Overwrite)
           .option("table", "OUTPUT_TABLE")
+          .option("jdbcUrl", "jdbc:phoenix:zkHost:zkport")
           .save();
-        jsc.stop();
     }
 }
 ```
 
 ### Save from an external RDD with a schema to a Phoenix table
 
 Just like the previous example, you can pass in the data source type as 
`phoenix` and specify the `table` and
-`zkUrl` parameters indicating which table and server to persist the DataFrame 
to.
+`jdbcUrl` parameters indicating which table and server to persist the 
DataFrame to.

Review Comment:
   same, jdbcUrl is optional.



##########
phoenix5-spark/README.md:
##########
@@ -65,21 +77,67 @@ public class PhoenixSparkRead {
     public static void main() throws Exception {
         SparkConf sparkConf = new 
SparkConf().setMaster("local").setAppName("phoenix-test")
             .set("spark.hadoopRDD.ignoreEmptySplits", "false");
-        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
-        SQLContext sqlContext = new SQLContext(jsc);
+        SparkSessinon spark = 
SparkSession.builder().config(sparkConf).getOrCreate();
         
         // Load data from TABLE1
-        Dataset<Row> df = sqlContext
+        Dataset<Row> df = spark
             .read()
             .format("phoenix")
             .option("table", "TABLE1")
+            .option("jdbcUrl", "jdbc:phoenix:zkHost:zkport")    

Review Comment:
   see above, this applies to all uses with explicit jdbcUrl 



##########
phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java:
##########
@@ -207,6 +208,27 @@ public Filter[] pushedFilters() {
 
     @Override
     public void pruneColumns(StructType schema) {
-        this.schema = schema;
+        if (schema.fields() != null && schema.fields().length != 0)
+            this.schema = schema;
+    }
+
+    //TODO Method PhoenixRuntime.generateColumnInfo skip only salt column, add 
skip tenant_id column.
+    private List<ColumnInfo> generateColumnInfo(Connection conn, String 
tableName) throws SQLException {

Review Comment:
   You may want to consider exposing this functionality in PhoenixRuntime, or 
the Phoenix MR classes, if it is not yet exposed.
   
   This is for later.



##########
phoenix5-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkSqlIT.scala:
##########
@@ -0,0 +1,95 @@
+package org.apache.phoenix.spark
+
+import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource
+import org.apache.spark.sql.Row
+
+import scala.collection.mutable.ListBuffer
+
+/**
+ * Note: If running directly from an IDE, these are the recommended VM 
parameters:
+ * -Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+ */
+class PhoenixSparkSqlIT extends AbstractPhoenixSparkIT {
+  val sqlTableName = "SQL_TABLE"
+
+  after {
+    spark.sql(s"DROP TABLE IF EXISTS  $sqlTableName")
+  }
+
+  test("Can read from table using spark-sql") {
+    val expected : Array[Row] = Array(
+      Row.fromSeq(Seq(1, "test_row_1")),
+      Row.fromSeq(Seq(2, "test_row_2"))
+    )
+
+    spark.sql(s"CREATE TABLE $sqlTableName USING phoenix " +
+      s"OPTIONS ('table' 'TABLE1', '${PhoenixDataSource.JDBC_URL}' 
'$jdbcUrl')")
+
+    val dataFrame = spark.sql(s"SELECT * FROM $sqlTableName")
+
+    dataFrame.show()
+
+    dataFrame.collect() shouldEqual expected
+  }
+
+  test("Can read from table using spark-sql with where clause and selecting 
specific columns`") {
+    val expected : Array[Row] = Array(
+      Row.fromSeq(Seq("test_row_1"))
+    )
+
+    spark.sql(s"CREATE TABLE $sqlTableName USING phoenix " +
+      s"OPTIONS ('table' 'TABLE1', '${PhoenixDataSource.JDBC_URL}' 
'$jdbcUrl')")
+
+    val dataFrame = spark.sql(s"SELECT COL1 as LABEL FROM $sqlTableName where 
ID=1")
+    dataFrame.show

Review Comment:
   While these may be useful for debugging, In general we shouldn't print to 
the stdout in tests.



##########
phoenix5-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala:
##########
@@ -18,8 +18,9 @@
 package org.apache.phoenix.spark
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.phoenix.spark.datasource.v2.FilterExpressionCompiler

Review Comment:
   looks like an unused import



##########
phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java:
##########
@@ -56,7 +66,7 @@ public DataSourceReader createReader(DataSourceOptions 
options) {
 
     @Override
     public Optional<DataSourceWriter> createWriter(String writeUUID, 
StructType schema,
-            SaveMode mode, DataSourceOptions options) {
+                                                   SaveMode mode, 
DataSourceOptions options) {

Review Comment:
   We don't use this indent style.



##########
phoenix5-spark/src/main/scala/org/apache/phoenix/spark/datasource/v2/FilterExpressionCompiler.scala:
##########
@@ -15,16 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.phoenix.spark
+package org.apache.phoenix.spark.datasource.v2

Review Comment:
   Generally, do we need the new v2 package in the Spark2 module ?



##########
phoenix5-spark/src/main/scala/org/apache/phoenix/spark/datasource/v2/FilterExpressionCompiler.scala:
##########
@@ -15,16 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.phoenix.spark
+package org.apache.phoenix.spark.datasource.v2

Review Comment:
   If it is still used in the legacy code, then it may be better not to move.
   It is only used by the v2 connectore, then it's fine to move.



##########
phoenix5-spark/src/main/scala/org/apache/phoenix/spark/datasource/v2/PhoenixSparkSqlRelation.scala:
##########
@@ -0,0 +1,58 @@
+package org.apache.phoenix.spark.datasource.v2
+
+import org.apache.phoenix.spark.datasource.v2.reader.PhoenixDataSourceReader
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode, 
SparkSession}
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.execution.datasources.v2.DataSourceRDD
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation, 
PrunedFilteredScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class PhoenixSparkSqlRelation(
+                                    @transient sparkSession: SparkSession,
+                                    params: Map[String, String]
+                                  ) extends BaseRelation with 
PrunedFilteredScan with InsertableRelation {
+
+  override def schema: StructType = dataSourceReader.readSchema()
+
+  override def sqlContext: SQLContext = sparkSession.sqlContext
+
+  private def dataSourceReader: PhoenixDataSourceReader = new 
PhoenixDataSourceReader(dataSourceOptions)
+
+  private def dataSourceOptions = new DataSourceOptions(params.asJava)
+
+  override def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row] = {
+    val requiredSchema =    StructType(requiredColumns.flatMap(c => 
schema.fields.find(_.name == c)))
+
+    val reader: PhoenixDataSourceReader = dataSourceReader
+    reader.pushFilters(filters)
+    reader.pruneColumns(requiredSchema)
+    val rdd = new DataSourceRDD(
+      sqlContext.sparkContext,
+      reader.planInputPartitions().asScala
+    )
+    rdd.map(ir => {
+      new GenericRowWithSchema(ir.toSeq(requiredSchema).toArray, 
requiredSchema)
+    })
+  }
+
+
+  override def insert(data: DataFrame, overwrite: Boolean): Unit = {
+    data
+      .write
+      .format("phoenix")
+      .option(PhoenixDataSource.TABLE, params(PhoenixDataSource.TABLE))
+      .option(PhoenixDataSource.JDBC_URL, 
PhoenixDataSource.getJdbcUrlFromOptions(dataSourceOptions))

Review Comment:
   does this handle the case when jdbcUrl is not specified ?



##########
phoenix5-spark3/README.md:
##########
@@ -28,6 +28,18 @@ Apart from the shaded connector JAR, you also need to add 
the hbase mapredcp lib
 (add the exact paths as appropiate to your system)
 Both the `spark.driver.extraClassPath` and `spark.executor.extraClassPath` 
properties need to be set the above classpath. You may add them 
spark-defaults.conf, or specify them on the spark-shell or spark-submit command 
line.
 
+## Configuration properties

Review Comment:
   Same comments apply as to the v2 readme,



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to