stoty commented on code in PR #139:
URL:
https://github.com/apache/phoenix-connectors/pull/139#discussion_r1734210681
##########
phoenix5-spark/README.md:
##########
@@ -45,7 +57,7 @@ val spark = SparkSession
val df = spark.sqlContext
.read
.format("phoenix")
- .options(Map("table" -> "TABLE1"))
+ .options(Map("table" -> "TABLE1", "jdbcUrl" -> "jdbc:phoenix:zkHost:zkport"))
Review Comment:
Specifying an explicit jdbcURL is somewhat of an anti-pattern.
Ideally, the connection info should be taken from hbase-site.xml
##########
phoenix5-spark/README.md:
##########
@@ -18,6 +18,18 @@ limitations under the License.
phoenix-spark extends Phoenix's MapReduce support to allow Spark to load
Phoenix tables as DataFrames,
and enables persisting DataFrames back to Phoenix.
+## Configuration properties
Review Comment:
Nice
##########
phoenix5-spark/src/it/resources/globalSetup.sql:
##########
@@ -60,7 +60,7 @@ UPSERT INTO "small" VALUES ('key1', 'foo', 10000)
UPSERT INTO "small" VALUES ('key2', 'bar', 20000)
UPSERT INTO "small" VALUES ('key3', 'xyz', 30000)
-CREATE TABLE MULTITENANT_TEST_TABLE (TENANT_ID VARCHAR NOT NULL,
ORGANIZATION_ID VARCHAR, GLOBAL_COL1 VARCHAR CONSTRAINT pk PRIMARY KEY
(TENANT_ID, ORGANIZATION_ID)) MULTI_TENANT=true
+CREATE TABLE MULTITENANT_TEST_TABLE (TENANT_ID VARCHAR NOT NULL,
ORGANIZATION_ID VARCHAR, GLOBAL_COL1 VARCHAR CONSTRAINT pk PRIMARY KEY
(TENANT_ID, ORGANIZATION_ID)) MULTI_TENANT=true, SALT_BUCKETS = 20
Review Comment:
If we want to test salting, we should create a new table and new tests for
that.
##########
phoenix5-spark/README.md:
##########
@@ -249,14 +307,13 @@ public class PhoenixSparkWriteFromRDDWithSchema {
}
// Create a DataFrame from the rows and the specified schema
- df = spark.createDataFrame(rows, schema);
+ Dataset<Row> df = spark.createDataFrame(rows, schema);
df.write()
.format("phoenix")
.mode(SaveMode.Overwrite)
.option("table", "OUTPUT_TABLE")
+ .option("jdbcUrl", "jdbc:phoenix:zkHost:zkport")
.save();
-
Review Comment:
Shouldn't we close the session ?
While this is only an example, I think that it should teach proper resource
management.
##########
phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java:
##########
@@ -33,21 +37,27 @@
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.types.StructType;
+import scala.collection.immutable.Map;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
/**
* Implements the DataSourceV2 api to read and write from Phoenix tables
*/
-public class PhoenixDataSource implements DataSourceV2, ReadSupport,
WriteSupport, DataSourceRegister {
+public class PhoenixDataSource implements DataSourceV2, ReadSupport,
WriteSupport, DataSourceRegister, RelationProvider {
private static final Logger logger =
LoggerFactory.getLogger(PhoenixDataSource.class);
public static final String SKIP_NORMALIZING_IDENTIFIER =
"skipNormalizingIdentifier";
@Deprecated
public static final String ZOOKEEPER_URL = "zkUrl";
public static final String JDBC_URL = "jdbcUrl";
public static final String PHOENIX_CONFIGS = "phoenixconfigs";
+ public static final String TABLE = "table";
+ public static final String DATE_AS_TIME_STAMP = "dateAsTimestamp";
+ public static final String DO_NOT_MAP_COLUMN_FAMILY =
"doNotMapColumnFamily";
+ public static final String TENANT_ID = "tenantId";
Review Comment:
use PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID as value instead of
repeating the string ?
##########
phoenix5-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala:
##########
@@ -14,14 +14,14 @@
package org.apache.phoenix.spark
import java.sql.DriverManager
-
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HConstants
import org.apache.hadoop.io.NullWritable
import org.apache.phoenix.jdbc.PhoenixDriver
import org.apache.phoenix.mapreduce.PhoenixInputFormat
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
import org.apache.phoenix.query.HBaseFactoryProvider
+import org.apache.phoenix.spark.datasource.v2.SparkSchemaUtil
Review Comment:
looks like an unused import.
##########
phoenix5-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkDatasourceV1IT.scala:
##########
@@ -0,0 +1,807 @@
+/*
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package org.apache.phoenix.spark
+
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
+import org.apache.phoenix.query.QueryServices
+import org.apache.phoenix.schema.types.{PSmallintArray, PVarchar}
+import
org.apache.phoenix.spark.datasource.v2.reader.PhoenixTestingInputPartitionReader
+import
org.apache.phoenix.spark.datasource.v2.writer.PhoenixTestingDataSourceWriter
+import org.apache.phoenix.spark.datasource.v2.{PhoenixDataSource,
PhoenixTestingDataSource, SparkSchemaUtil}
+import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
+import org.apache.spark.SparkException
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{Row, SaveMode}
+
+import java.sql.DriverManager
+import java.util.Date
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+
+/**
+ * Note: If running directly from an IDE, these are the recommended VM
parameters:
+ * -Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+ */
+class PhoenixSparkDatasourceV1IT extends AbstractPhoenixSparkIT {
Review Comment:
IIRC AbstractPhoenixSparkIT is only used by the V1 tests.
If you rename the tests, it would make sense to also rename the parent class.
##########
phoenix5-spark/README.md:
##########
@@ -89,7 +147,7 @@ public class PhoenixSparkRead {
### Save DataFrames to Phoenix using DataSourceV2
The `save` is method on DataFrame allows passing in a data source type. You
can use
-`phoenix` for DataSourceV2 and must also pass in a `table` and `zkUrl`
parameter to
+`phoenix` for DataSourceV2 and must also pass in a `table` and `jdbcUrl`
parameter to
Review Comment:
This is not true anymore (it was already wrong, but since you're updating
the docs, let's fix this).
jdbcUrl is not mandatory.
##########
phoenix5-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkDatasourceV1ITTenantSpecific.scala:
##########
@@ -105,39 +102,6 @@ class PhoenixSparkITTenantSpecific extends
AbstractPhoenixSparkIT {
verifyResults
}
- ignore("Can write a DataFrame using 'DataFrame.write' to tenant-specific
view - Spark2 sparse columns") {
Review Comment:
I am not sure why this is ignored, but I assume that there is a bug that
this would catch.
It'd be better keep this.
##########
phoenix5-spark/README.md:
##########
@@ -141,32 +201,32 @@ public class PhoenixSparkWriteFromInputTable {
public static void main() throws Exception {
SparkConf sparkConf = new
SparkConf().setMaster("local").setAppName("phoenix-test")
- .set("spark.hadoopRDD.ignoreEmptySplits", "false");
- JavaSparkContext jsc = new JavaSparkContext(sparkConf);
- SQLContext sqlContext = new SQLContext(jsc);
+ .set("spark.hadoopRDD.ignoreEmptySplits", "false");
+ SparkSessinon spark =
SparkSession.builder().config(sparkConf).getOrCreate();
// Load INPUT_TABLE
- Dataset<Row> df = sqlContext
+ Dataset<Row> df = spark
.read()
.format("phoenix")
.option("table", "INPUT_TABLE")
+ .option("jdbcUrl", "jdbc:phoenix:zkHost:zkport")
.load();
// Save to OUTPUT_TABLE
df.write()
.format("phoenix")
.mode(SaveMode.Overwrite)
.option("table", "OUTPUT_TABLE")
+ .option("jdbcUrl", "jdbc:phoenix:zkHost:zkport")
.save();
- jsc.stop();
}
}
```
### Save from an external RDD with a schema to a Phoenix table
Just like the previous example, you can pass in the data source type as
`phoenix` and specify the `table` and
-`zkUrl` parameters indicating which table and server to persist the DataFrame
to.
+`jdbcUrl` parameters indicating which table and server to persist the
DataFrame to.
Review Comment:
same, jdbcUrl is optional.
##########
phoenix5-spark/README.md:
##########
@@ -65,21 +77,67 @@ public class PhoenixSparkRead {
public static void main() throws Exception {
SparkConf sparkConf = new
SparkConf().setMaster("local").setAppName("phoenix-test")
.set("spark.hadoopRDD.ignoreEmptySplits", "false");
- JavaSparkContext jsc = new JavaSparkContext(sparkConf);
- SQLContext sqlContext = new SQLContext(jsc);
+ SparkSessinon spark =
SparkSession.builder().config(sparkConf).getOrCreate();
// Load data from TABLE1
- Dataset<Row> df = sqlContext
+ Dataset<Row> df = spark
.read()
.format("phoenix")
.option("table", "TABLE1")
+ .option("jdbcUrl", "jdbc:phoenix:zkHost:zkport")
Review Comment:
see above, this applies to all uses with explicit jdbcUrl
##########
phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java:
##########
@@ -207,6 +208,27 @@ public Filter[] pushedFilters() {
@Override
public void pruneColumns(StructType schema) {
- this.schema = schema;
+ if (schema.fields() != null && schema.fields().length != 0)
+ this.schema = schema;
+ }
+
+ //TODO Method PhoenixRuntime.generateColumnInfo skip only salt column, add
skip tenant_id column.
+ private List<ColumnInfo> generateColumnInfo(Connection conn, String
tableName) throws SQLException {
Review Comment:
You may want to consider exposing this functionality in PhoenixRuntime, or
the Phoenix MR classes, if it is not yet exposed.
This is for later.
##########
phoenix5-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkSqlIT.scala:
##########
@@ -0,0 +1,95 @@
+package org.apache.phoenix.spark
+
+import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource
+import org.apache.spark.sql.Row
+
+import scala.collection.mutable.ListBuffer
+
+/**
+ * Note: If running directly from an IDE, these are the recommended VM
parameters:
+ * -Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+ */
+class PhoenixSparkSqlIT extends AbstractPhoenixSparkIT {
+ val sqlTableName = "SQL_TABLE"
+
+ after {
+ spark.sql(s"DROP TABLE IF EXISTS $sqlTableName")
+ }
+
+ test("Can read from table using spark-sql") {
+ val expected : Array[Row] = Array(
+ Row.fromSeq(Seq(1, "test_row_1")),
+ Row.fromSeq(Seq(2, "test_row_2"))
+ )
+
+ spark.sql(s"CREATE TABLE $sqlTableName USING phoenix " +
+ s"OPTIONS ('table' 'TABLE1', '${PhoenixDataSource.JDBC_URL}'
'$jdbcUrl')")
+
+ val dataFrame = spark.sql(s"SELECT * FROM $sqlTableName")
+
+ dataFrame.show()
+
+ dataFrame.collect() shouldEqual expected
+ }
+
+ test("Can read from table using spark-sql with where clause and selecting
specific columns`") {
+ val expected : Array[Row] = Array(
+ Row.fromSeq(Seq("test_row_1"))
+ )
+
+ spark.sql(s"CREATE TABLE $sqlTableName USING phoenix " +
+ s"OPTIONS ('table' 'TABLE1', '${PhoenixDataSource.JDBC_URL}'
'$jdbcUrl')")
+
+ val dataFrame = spark.sql(s"SELECT COL1 as LABEL FROM $sqlTableName where
ID=1")
+ dataFrame.show
Review Comment:
While these may be useful for debugging, In general we shouldn't print to
the stdout in tests.
##########
phoenix5-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala:
##########
@@ -18,8 +18,9 @@
package org.apache.phoenix.spark
import org.apache.hadoop.conf.Configuration
+import org.apache.phoenix.spark.datasource.v2.FilterExpressionCompiler
Review Comment:
looks like an unused import
##########
phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java:
##########
@@ -56,7 +66,7 @@ public DataSourceReader createReader(DataSourceOptions
options) {
@Override
public Optional<DataSourceWriter> createWriter(String writeUUID,
StructType schema,
- SaveMode mode, DataSourceOptions options) {
+ SaveMode mode,
DataSourceOptions options) {
Review Comment:
We don't use this indent style.
##########
phoenix5-spark/src/main/scala/org/apache/phoenix/spark/datasource/v2/FilterExpressionCompiler.scala:
##########
@@ -15,16 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.phoenix.spark
+package org.apache.phoenix.spark.datasource.v2
Review Comment:
Generally, do we need the new v2 package in the Spark2 module ?
##########
phoenix5-spark/src/main/scala/org/apache/phoenix/spark/datasource/v2/FilterExpressionCompiler.scala:
##########
@@ -15,16 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.phoenix.spark
+package org.apache.phoenix.spark.datasource.v2
Review Comment:
If it is still used in the legacy code, then it may be better not to move.
It is only used by the v2 connectore, then it's fine to move.
##########
phoenix5-spark/src/main/scala/org/apache/phoenix/spark/datasource/v2/PhoenixSparkSqlRelation.scala:
##########
@@ -0,0 +1,58 @@
+package org.apache.phoenix.spark.datasource.v2
+
+import org.apache.phoenix.spark.datasource.v2.reader.PhoenixDataSourceReader
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode,
SparkSession}
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.execution.datasources.v2.DataSourceRDD
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation,
PrunedFilteredScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class PhoenixSparkSqlRelation(
+ @transient sparkSession: SparkSession,
+ params: Map[String, String]
+ ) extends BaseRelation with
PrunedFilteredScan with InsertableRelation {
+
+ override def schema: StructType = dataSourceReader.readSchema()
+
+ override def sqlContext: SQLContext = sparkSession.sqlContext
+
+ private def dataSourceReader: PhoenixDataSourceReader = new
PhoenixDataSourceReader(dataSourceOptions)
+
+ private def dataSourceOptions = new DataSourceOptions(params.asJava)
+
+ override def buildScan(requiredColumns: Array[String], filters:
Array[Filter]): RDD[Row] = {
+ val requiredSchema = StructType(requiredColumns.flatMap(c =>
schema.fields.find(_.name == c)))
+
+ val reader: PhoenixDataSourceReader = dataSourceReader
+ reader.pushFilters(filters)
+ reader.pruneColumns(requiredSchema)
+ val rdd = new DataSourceRDD(
+ sqlContext.sparkContext,
+ reader.planInputPartitions().asScala
+ )
+ rdd.map(ir => {
+ new GenericRowWithSchema(ir.toSeq(requiredSchema).toArray,
requiredSchema)
+ })
+ }
+
+
+ override def insert(data: DataFrame, overwrite: Boolean): Unit = {
+ data
+ .write
+ .format("phoenix")
+ .option(PhoenixDataSource.TABLE, params(PhoenixDataSource.TABLE))
+ .option(PhoenixDataSource.JDBC_URL,
PhoenixDataSource.getJdbcUrlFromOptions(dataSourceOptions))
Review Comment:
does this handle the case when jdbcUrl is not specified ?
##########
phoenix5-spark3/README.md:
##########
@@ -28,6 +28,18 @@ Apart from the shaded connector JAR, you also need to add
the hbase mapredcp lib
(add the exact paths as appropiate to your system)
Both the `spark.driver.extraClassPath` and `spark.executor.extraClassPath`
properties need to be set the above classpath. You may add them
spark-defaults.conf, or specify them on the spark-shell or spark-submit command
line.
+## Configuration properties
Review Comment:
Same comments apply as to the v2 readme,
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]