This is an automated email from the ASF dual-hosted git repository. stoty pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git
The following commit(s) were added to refs/heads/master by this push: new 5aedf30 PHOENIX-6633 Use standard JDBC URL in Spark Connector and make it optional 5aedf30 is described below commit 5aedf308ffcaf76ba85bfc2efdb55a8fb72c991e Author: Istvan Toth <st...@apache.org> AuthorDate: Mon Sep 25 08:16:06 2023 +0200 PHOENIX-6633 Use standard JDBC URL in Spark Connector and make it optional --- phoenix5-spark/README.md | 46 ++++---- phoenix5-spark/pom.xml | 21 +++- .../org/apache/phoenix/spark/DataSourceApiIT.java | 124 +++++++++++++++++---- .../spark/datasource/v2/PhoenixDataSource.java | 32 ++++++ .../v2/reader/PhoenixDataSourceReadOptions.java | 12 +- .../v2/reader/PhoenixDataSourceReader.java | 24 ++-- .../v2/reader/PhoenixInputPartitionReader.java | 10 +- .../v2/writer/PhoenixDataSourceWriteOptions.java | 28 ++--- .../v2/writer/PhoenixDataSourceWriter.java | 12 +- .../datasource/v2/writer/PhoenixDataWriter.java | 7 +- .../org/apache/phoenix/spark/DataSourceApiIT.java | 120 ++++++++++++++++---- .../sql/connector/PhoenixTestingDataSource.java | 8 +- phoenix5-spark3/README.md | 40 ++++--- .../spark/sql/connector/PhoenixDataSource.java | 38 +++++-- .../reader/PhoenixDataSourceReadOptions.java | 12 +- .../connector/reader/PhoenixPartitionReader.java | 10 +- .../spark/sql/connector/reader/PhoenixScan.java | 14 +-- .../sql/connector/writer/PhoenixBatchWrite.java | 6 +- .../writer/PhoenixDataSourceWriteOptions.java | 28 ++--- .../sql/connector/writer/PhoenixDataWriter.java | 7 +- .../spark/sql/connector/PhoenixDataSourceTest.java | 27 +++++ pom.xml | 35 ++---- 22 files changed, 427 insertions(+), 234 deletions(-) diff --git a/phoenix5-spark/README.md b/phoenix5-spark/README.md index 2540b73..f443cb0 100644 --- a/phoenix5-spark/README.md +++ b/phoenix5-spark/README.md @@ -33,7 +33,6 @@ Scala example: ```scala import org.apache.spark.SparkContext import org.apache.spark.sql.{SQLContext, SparkSession} -import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource val spark = SparkSession .builder() @@ -45,7 +44,7 @@ val spark = SparkSession val df = spark.sqlContext .read .format("phoenix") - .options(Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181")) + .options(Map("table" -> "TABLE1")) .load df.filter(df("COL1") === "test_row_1" && df("ID") === 1L) @@ -60,8 +59,6 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; -import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL; - public class PhoenixSparkRead { public static void main() throws Exception { @@ -74,7 +71,6 @@ public class PhoenixSparkRead { .read() .format("phoenix") .option("table", "TABLE1") - .option(ZOOKEEPER_URL, "phoenix-server:2181") .load(); df.createOrReplaceTempView("TABLE1"); @@ -108,7 +104,6 @@ you can load from an input table and save to an output table as a DataFrame as f ```scala import org.apache.spark.SparkContext import org.apache.spark.sql.{SQLContext, SparkSession, SaveMode} -import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource val spark = SparkSession .builder() @@ -120,14 +115,14 @@ val spark = SparkSession val df = spark.sqlContext .read .format("phoenix") - .options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181")) + .options(Map("table" -> "INPUT_TABLE")) .load // Save to OUTPUT_TABLE df.write .format("phoenix") .mode(SaveMode.Overwrite) - .options(Map("table" -> "OUTPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181")) + .options(Map("table" -> "OUTPUT_TABLE")) .save() ``` Java example: @@ -139,8 +134,6 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SQLContext; -import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL; - public class PhoenixSparkWriteFromInputTable { public static void main() throws Exception { @@ -153,7 +146,6 @@ public class PhoenixSparkWriteFromInputTable { .read() .format("phoenix") .option("table", "INPUT_TABLE") - .option(ZOOKEEPER_URL, "phoenix-server:2181") .load(); // Save to OUTPUT_TABLE @@ -161,7 +153,6 @@ public class PhoenixSparkWriteFromInputTable { .format("phoenix") .mode(SaveMode.Overwrite) .option("table", "OUTPUT_TABLE") - .option(ZOOKEEPER_URL, "phoenix-server:2181") .save(); jsc.stop(); } @@ -187,7 +178,6 @@ you can save a dataframe from an RDD as follows in Scala: import org.apache.spark.SparkContext import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, StructField} import org.apache.spark.sql.{Row, SQLContext, SparkSession, SaveMode} -import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource val spark = SparkSession .builder() @@ -209,7 +199,7 @@ val df = spark.sqlContext.createDataFrame(rowRDD, schema) df.write .format("phoenix") - .options(Map("table" -> "OUTPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181")) + .options(Map("table" -> "OUTPUT_TABLE")) .mode(SaveMode.Overwrite) .save() ``` @@ -230,8 +220,6 @@ import org.apache.spark.sql.types.StructType; import java.util.ArrayList; import java.util.List; -import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL; - public class PhoenixSparkWriteFromRDDWithSchema { public static void main() throws Exception { @@ -260,7 +248,6 @@ public class PhoenixSparkWriteFromRDDWithSchema { .format("phoenix") .mode(SaveMode.Overwrite) .option("table", "OUTPUT_TABLE") - .option(ZOOKEEPER_URL, "phoenix-server:2181") .save(); jsc.stop(); @@ -270,13 +257,21 @@ public class PhoenixSparkWriteFromRDDWithSchema { ## Notes -- If you want to use DataSourceV1, you can use source type `"org.apache.phoenix.spark"` - instead of `"phoenix"`, however this is deprecated as of `connectors-1.0.0`. -- The (deprecated) functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and `saveToPhoenix` all support +- The DataSourceV2 based "phoenix" data source accepts the `"jdbcUrl"` parameter, which can be +used to override the default Hbase/Phoenix instance specified in hbase-site.xml. It also accepts +the deprected `zkUrl` parameter for backwards compatibility purposes. If neither is specified, +it falls back to using connection defined by hbase-site.xml. +- `"jdbcUrl"` expects a full Phoenix JDBC URL, i.e. "jdbc:phoenix" or "jdbc:phoenix:zkHost:zkport", +while `"zkUrl"` expects the ZK quorum only, i.e. "zkHost:zkPort" +- If you want to use DataSourceV1, you can use source type `"org.apache.phoenix.spark"` +instead of `"phoenix"`, however this is deprecated as of `connectors-1.0.0`. +The `"org.apache.phoenix.spark"` datasource does not accept the `"jdbcUrl"` parameter, +only `"zkUrl"` +- The (deprecated) functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and +`saveToPhoenix` use the deprecated `"org.apache.phoenix.spark"` datasource, and allow optionally specifying a `conf` Hadoop configuration parameter with custom Phoenix client settings, -as well as an optional `zkUrl` parameter for the Phoenix connection URL. -- If `zkUrl` isn't specified, it's assumed that the "hbase.zookeeper.quorum" property has been set -in the `conf` parameter. Similarly, if no configuration is passed in, `zkUrl` must be specified. +as well as an optional `zkUrl` parameter. + - As of [PHOENIX-5197](https://issues.apache.org/jira/browse/PHOENIX-5197), you can pass configurations from the driver to executors as a comma-separated list against the key `phoenixConfigs` i.e (PhoenixDataSource.PHOENIX_CONFIGS), for ex: ```scala @@ -284,10 +279,11 @@ to executors as a comma-separated list against the key `phoenixConfigs` i.e (Pho .sqlContext .read .format("phoenix") - .options(Map("table" -> "Table1", "zkUrl" -> "phoenix-server:2181", "phoenixConfigs" -> "hbase.client.retries.number=10,hbase.client.pause=10000")) + .options(Map("table" -> "Table1", "jdbcUrl" -> "jdbc:phoenix:phoenix-server:2181", "phoenixConfigs" -> "hbase.client.retries.number=10,hbase.client.pause=10000")) .load; ``` - This list of properties is parsed and populated into a properties map which is passed to `DriverManager.getConnection(connString, propsMap)`. + This list of properties is parsed and populated into a properties map which is passed to + `DriverManager.getConnection(connString, propsMap)`. Note that the same property values will be used for both the driver and all executors and these configurations are used each time a connection is made (both on the driver and executors). diff --git a/phoenix5-spark/pom.xml b/phoenix5-spark/pom.xml index 9cd8ff3..35b1d83 100644 --- a/phoenix5-spark/pom.xml +++ b/phoenix5-spark/pom.xml @@ -446,6 +446,25 @@ <build> <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>add-test-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>src/it/java</source> + <source>src/it/scala</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> <plugin> <artifactId>maven-dependency-plugin</artifactId> <configuration> @@ -513,7 +532,7 @@ to https://github.com/junit-team/junit4/issues/1223 --> <parallel>false</parallel> <tagsToExclude>Integration-Test</tagsToExclude> - <argLine>--XX:ReservedCodeCacheSize=512m ${argLine}</argLine> + <argLine>-XX:ReservedCodeCacheSize=512m ${argLine}</argLine> </configuration> </execution> </executions> diff --git a/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java b/phoenix5-spark/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java similarity index 50% copy from phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java copy to phoenix5-spark/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java index dbcf45f..c6a4465 100644 --- a/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java +++ b/phoenix5-spark/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java @@ -17,7 +17,11 @@ */ package org.apache.phoenix.spark; +import org.apache.hadoop.conf.Configuration; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.query.ConfigurationFactory; +import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource; +import org.apache.phoenix.util.InstanceResolver; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.*; @@ -25,13 +29,17 @@ import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; import java.sql.*; import java.util.Arrays; -import static org.apache.phoenix.spark.sql.connector.PhoenixDataSource.ZOOKEEPER_URL; +import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.JDBC_URL; +import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; import static org.junit.Assert.*; public class DataSourceApiIT extends ParallelStatsDisabledIT { @@ -40,46 +48,114 @@ public class DataSourceApiIT extends ParallelStatsDisabledIT { super(); } + @BeforeClass + public static synchronized void setupInstanceResolver() { + //FIXME This should be called whenever we set up a miniCluster, deep in BaseTest + InstanceResolver.clearSingletons(); + // Make sure the ConnectionInfo in the tool doesn't try to pull a default Configuration + InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() { + @Override + public Configuration getConfiguration() { + return new Configuration(config); + } + + @Override + public Configuration getConfiguration(Configuration confToClone) { + Configuration copy = new Configuration(config); + copy.addResource(confToClone); + return copy; + } + }); + } + @Test - public void basicWriteTest() throws SQLException { + public void basicWriteAndReadBackTest() throws SQLException { SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); SQLContext sqlContext = new SQLContext(jsc); String tableName = generateUniqueName(); try (Connection conn = DriverManager.getConnection(getUrl()); - Statement stmt = conn.createStatement()){ - stmt.executeUpdate("CREATE TABLE " + tableName + " (id INTEGER PRIMARY KEY, v1 VARCHAR)"); + Statement stmt = conn.createStatement()) { + stmt.executeUpdate( + "CREATE TABLE " + tableName + " (id INTEGER PRIMARY KEY, v1 VARCHAR)"); } - try(SparkSession spark = sqlContext.sparkSession()) { - - StructType schema = new StructType(new StructField[]{ - new StructField("id", DataTypes.IntegerType, false, Metadata.empty()), - new StructField("v1", DataTypes.StringType, false, Metadata.empty()) - }); - - Dataset<Row> df = spark.createDataFrame( - Arrays.asList( - RowFactory.create(1, "x")), - schema); - - df.write() - .format("phoenix") - .mode(SaveMode.Append) - .option("table", tableName) - .option(ZOOKEEPER_URL, getUrl()) - .save(); + try (SparkSession spark = sqlContext.sparkSession()) { + + StructType schema = + new StructType(new StructField[] { + new StructField("id", DataTypes.IntegerType, false, Metadata.empty()), + new StructField("v1", DataTypes.StringType, false, Metadata.empty()) }); + + // Use old zkUrl + Dataset<Row> df1 = + spark.createDataFrame( + Arrays.asList(RowFactory.create(1, "x")), + schema); + + df1.write().format("phoenix").mode(SaveMode.Overwrite) + .option("table", tableName) + .option(ZOOKEEPER_URL, getUrl()) + .save(); + + // Use jdbcUrl + Dataset<Row> df2 = + spark.createDataFrame( + Arrays.asList(RowFactory.create(2, "x")), + schema); + + df2.write().format("phoenix").mode(SaveMode.Overwrite) + .option("table", tableName) + .option(JDBC_URL, JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + getUrl()) + .save(); + + // Use default from hbase-site.xml + Dataset<Row> df3 = + spark.createDataFrame( + Arrays.asList(RowFactory.create(3, "x")), + schema); + + df3.write().format("phoenix").mode(SaveMode.Overwrite) + .option("table", tableName) + .save(); try (Connection conn = DriverManager.getConnection(getUrl()); - Statement stmt = conn.createStatement()) { + Statement stmt = conn.createStatement()) { ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName); assertTrue(rs.next()); assertEquals(1, rs.getInt(1)); assertEquals("x", rs.getString(2)); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals("x", rs.getString(2)); + assertTrue(rs.next()); + assertEquals(3, rs.getInt(1)); + assertEquals("x", rs.getString(2)); assertFalse(rs.next()); } + Dataset df1Read = spark.read().format("phoenix") + .option("table", tableName) + .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load(); + + assertEquals(3l, df1Read.count()); + + // Use jdbcUrl + Dataset df2Read = spark.read().format("phoenix") + .option("table", tableName) + .option(PhoenixDataSource.JDBC_URL, + JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + getUrl()) + .load(); + + assertEquals(3l, df2Read.count()); + + // Use default + Dataset df3Read = spark.read().format("phoenix") + .option("table", tableName) + .load(); + + assertEquals(3l, df3Read.count()); } finally { jsc.stop(); @@ -116,7 +192,7 @@ public class DataSourceApiIT extends ParallelStatsDisabledIT { df.write() .format("phoenix") - .mode(SaveMode.Append) + .mode(SaveMode.Overwrite) .option("table", tableName) .option(ZOOKEEPER_URL, getUrl()) .save(); diff --git a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java index 792e8f8..de773d5 100644 --- a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java +++ b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java @@ -34,6 +34,9 @@ import org.apache.spark.sql.sources.v2.reader.DataSourceReader; import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; import org.apache.spark.sql.types.StructType; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; + /** * Implements the DataSourceV2 api to read and write from Phoenix tables */ @@ -41,7 +44,9 @@ public class PhoenixDataSource implements DataSourceV2, ReadSupport, WriteSupp private static final Logger logger = LoggerFactory.getLogger(PhoenixDataSource.class); public static final String SKIP_NORMALIZING_IDENTIFIER = "skipNormalizingIdentifier"; + @Deprecated public static final String ZOOKEEPER_URL = "zkUrl"; + public static final String JDBC_URL = "jdbcUrl"; public static final String PHOENIX_CONFIGS = "phoenixconfigs"; @Override @@ -55,6 +60,33 @@ public class PhoenixDataSource implements DataSourceV2, ReadSupport, WriteSupp return Optional.of(new PhoenixDataSourceWriter(mode, schema, options)); } + public static String getJdbcUrlFromOptions(final DataSourceOptions options) { + if (options.get(JDBC_URL).orElse(null) != null + && options.get(ZOOKEEPER_URL).orElse(null) != null) { + throw new RuntimeException("If " + JDBC_URL + " is specified, then " + ZOOKEEPER_URL + + " must not be specified"); + } + + String jdbcUrl = options.get(JDBC_URL).orElse(null); + String zkUrl = options.get(ZOOKEEPER_URL).orElse(null); + // Backward compatibility logic + if (jdbcUrl == null) { + if (zkUrl != null) { + if (zkUrl.startsWith(JDBC_PROTOCOL)) { + // full URL specified, use it + jdbcUrl = zkUrl; + } else { + // backwards compatibility, assume ZK, and missing protocol + // Don't use the specific protocol, as we need to work with older releases. + jdbcUrl = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl; + } + } else { + jdbcUrl = JDBC_PROTOCOL; + } + } + return jdbcUrl; + } + /** * Extract HBase and Phoenix properties that need to be set in both the driver and workers. * We expect these properties to be passed against the key diff --git a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java index 003a34d..3d14292 100644 --- a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java +++ b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java @@ -23,18 +23,18 @@ import java.util.Properties; class PhoenixDataSourceReadOptions implements Serializable { private final String tenantId; - private final String zkUrl; + private final String jdbcUrl; private final String scn; private final String selectStatement; private final Properties overriddenProps; private final byte[] pTableCacheBytes; - PhoenixDataSourceReadOptions(String zkUrl, String scn, String tenantId, + PhoenixDataSourceReadOptions(String jdbcUrl, String scn, String tenantId, String selectStatement, Properties overriddenProps, byte[] pTableCacheBytes) { - if(overriddenProps == null){ + if (overriddenProps == null){ throw new NullPointerException(); } - this.zkUrl = zkUrl; + this.jdbcUrl = jdbcUrl; this.scn = scn; this.tenantId = tenantId; this.selectStatement = selectStatement; @@ -50,8 +50,8 @@ class PhoenixDataSourceReadOptions implements Serializable { return scn; } - String getZkUrl() { - return zkUrl; + String getJdbcUrl() { + return jdbcUrl; } String getTenantId() { diff --git a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java index b839548..a7aca22 100644 --- a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java +++ b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java @@ -61,16 +61,12 @@ import java.util.List; import java.util.Optional; import java.util.Properties; -import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; - public class PhoenixDataSourceReader implements DataSourceReader, SupportsPushDownFilters, SupportsPushDownRequiredColumns { private final DataSourceOptions options; private final String tableName; - private final String zkUrl; + private final String jdbcUrl; private final boolean dateAsTimestamp; private final Properties overriddenProps; @@ -83,14 +79,12 @@ public class PhoenixDataSourceReader implements DataSourceReader, SupportsPushDo if (!options.tableName().isPresent()) { throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined"); } - if (!options.get(PhoenixDataSource.ZOOKEEPER_URL).isPresent()) { - throw new RuntimeException("No Phoenix option " + PhoenixDataSource.ZOOKEEPER_URL + " defined"); - } + this.options = options; this.tableName = options.tableName().get(); - this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL).get(); + this.jdbcUrl = PhoenixDataSource.getJdbcUrlFromOptions(options); this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false); - this.overriddenProps = extractPhoenixHBaseConfFromOptions(options); + this.overriddenProps = PhoenixDataSource.extractPhoenixHBaseConfFromOptions(options); setSchema(); } @@ -98,8 +92,7 @@ public class PhoenixDataSourceReader implements DataSourceReader, SupportsPushDo * Sets the schema using all the table columns before any column pruning has been done */ private void setSchema() { - try (Connection conn = DriverManager.getConnection( - JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) { + try (Connection conn = DriverManager.getConnection(jdbcUrl, overriddenProps)) { List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, null); Seq<ColumnInfo> columnInfoSeq = JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq(); schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp); @@ -134,14 +127,13 @@ public class PhoenixDataSourceReader implements DataSourceReader, SupportsPushDo // Generate splits based off statistics, or just region splits? boolean splitByStats = options.getBoolean( PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS); - if(currentScnValue.isPresent()) { + if (currentScnValue.isPresent()) { overriddenProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue.get()); } if (tenantId.isPresent()){ overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId.get()); } - try (Connection conn = DriverManager.getConnection( - JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) { + try (Connection conn = DriverManager.getConnection(jdbcUrl, overriddenProps)) { List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, new ArrayList<>( Arrays.asList(schema.names()))); final Statement statement = conn.createStatement(); @@ -188,7 +180,7 @@ public class PhoenixDataSourceReader implements DataSourceReader, SupportsPushDo byte[] pTableCacheBytes = PTableImpl.toProto(queryPlan.getTableRef().getTable()). toByteArray(); PhoenixDataSourceReadOptions phoenixDataSourceOptions = - new PhoenixDataSourceReadOptions(zkUrl, currentScnValue.orElse(null), + new PhoenixDataSourceReadOptions(jdbcUrl, currentScnValue.orElse(null), tenantId.orElse(null), selectStatement, overriddenProps, pTableCacheBytes); if (splitByStats) { diff --git a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java index a58cfa0..a7f6240 100644 --- a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java +++ b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java @@ -62,9 +62,6 @@ import org.apache.spark.sql.types.StructType; import scala.collection.Iterator; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; - public class PhoenixInputPartitionReader implements InputPartitionReader<InternalRow> { private final SerializableWritable<PhoenixInputSplit> phoenixInputSplit; @@ -89,7 +86,7 @@ public class PhoenixInputPartitionReader implements InputPartitionReader<Interna private QueryPlan getQueryPlan() throws SQLException { String scn = options.getScn(); String tenantId = options.getTenantId(); - String zkUrl = options.getZkUrl(); + String jdbcUrl = options.getJdbcUrl(); Properties overridingProps = getOverriddenPropsFromOptions(); if (scn != null) { overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn); @@ -97,8 +94,7 @@ public class PhoenixInputPartitionReader implements InputPartitionReader<Interna if (tenantId != null) { overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); } - try (Connection conn = DriverManager.getConnection( - JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overridingProps)) { + try (Connection conn = DriverManager.getConnection(jdbcUrl, overridingProps)) { PTable pTable = null; try { pTable = PTable.parseFrom(options.getPTableCacheBytes()); @@ -185,7 +181,7 @@ public class PhoenixInputPartitionReader implements InputPartitionReader<Interna @Override public void close() throws IOException { - if(resultSet != null) { + if (resultSet != null) { try { resultSet.close(); } catch (SQLException e) { diff --git a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java index cc7f459..7f1bb5f 100644 --- a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java +++ b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java @@ -25,30 +25,30 @@ import java.util.Properties; class PhoenixDataSourceWriteOptions implements Serializable { private final String tableName; - private final String zkUrl; + private final String jdbcUrl; private final String tenantId; private final String scn; private final StructType schema; private final boolean skipNormalizingIdentifier; private final Properties overriddenProps; - private PhoenixDataSourceWriteOptions(String tableName, String zkUrl, String scn, + private PhoenixDataSourceWriteOptions(String tableName, String jdbcUrl, String scn, String tenantId, StructType schema, boolean skipNormalizingIdentifier, Properties overriddenProps) { if (tableName == null) { - throw new NullPointerException(); + throw new IllegalArgumentException("tableName must not be null"); } - if (zkUrl == null) { - throw new NullPointerException(); + if (jdbcUrl == null) { + throw new IllegalArgumentException("jdbcUrl must not be null"); } if (schema == null) { - throw new NullPointerException(); + throw new IllegalArgumentException("schema must not be null"); } if (overriddenProps == null) { - throw new NullPointerException(); + throw new IllegalArgumentException("overriddenProps must not be null"); } this.tableName = tableName; - this.zkUrl = zkUrl; + this.jdbcUrl = jdbcUrl; this.scn = scn; this.tenantId = tenantId; this.schema = schema; @@ -60,8 +60,8 @@ class PhoenixDataSourceWriteOptions implements Serializable { return scn; } - String getZkUrl() { - return zkUrl; + String getJdbcUrl() { + return jdbcUrl; } String getTenantId() { @@ -86,7 +86,7 @@ class PhoenixDataSourceWriteOptions implements Serializable { static class Builder { private String tableName; - private String zkUrl; + private String jdbcUrl; private String scn; private String tenantId; private StructType schema; @@ -98,8 +98,8 @@ class PhoenixDataSourceWriteOptions implements Serializable { return this; } - Builder setZkUrl(String zkUrl) { - this.zkUrl = zkUrl; + Builder setJdbcUrl(String jdbcUrl) { + this.jdbcUrl = jdbcUrl; return this; } @@ -129,7 +129,7 @@ class PhoenixDataSourceWriteOptions implements Serializable { } PhoenixDataSourceWriteOptions build() { - return new PhoenixDataSourceWriteOptions(tableName, zkUrl, scn, tenantId, schema, + return new PhoenixDataSourceWriteOptions(tableName, jdbcUrl, scn, tenantId, schema, skipNormalizingIdentifier, overriddenProps); } } diff --git a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriter.java b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriter.java index 31a3065..08fe886 100644 --- a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriter.java +++ b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriter.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.spark.datasource.v2.writer; +import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.catalyst.InternalRow; @@ -28,8 +29,6 @@ import org.apache.spark.sql.types.StructType; import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.CURRENT_SCN_VALUE; import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER; -import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL; -import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions; public class PhoenixDataSourceWriter implements DataSourceWriter { @@ -42,9 +41,6 @@ public class PhoenixDataSourceWriter implements DataSourceWriter { if (!options.tableName().isPresent()) { throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined"); } - if (!options.get(ZOOKEEPER_URL).isPresent()) { - throw new RuntimeException("No Phoenix option " + ZOOKEEPER_URL + " defined"); - } this.options = createPhoenixDataSourceWriteOptions(options, schema); } @@ -74,16 +70,16 @@ public class PhoenixDataSourceWriter implements DataSourceWriter { StructType schema) { String scn = options.get(CURRENT_SCN_VALUE).orElse(null); String tenantId = options.get(PhoenixRuntime.TENANT_ID_ATTRIB).orElse(null); - String zkUrl = options.get(ZOOKEEPER_URL).get(); + String jdbcUrl = PhoenixDataSource.getJdbcUrlFromOptions(options); boolean skipNormalizingIdentifier = options.getBoolean(SKIP_NORMALIZING_IDENTIFIER, false); return new PhoenixDataSourceWriteOptions.Builder() .setTableName(options.tableName().get()) - .setZkUrl(zkUrl) + .setJdbcUrl(jdbcUrl) .setScn(scn) .setTenantId(tenantId) .setSchema(schema) .setSkipNormalizingIdentifier(skipNormalizingIdentifier) - .setOverriddenProps(extractPhoenixHBaseConfFromOptions(options)) + .setOverriddenProps(PhoenixDataSource.extractPhoenixHBaseConfFromOptions(options)) .build(); } } diff --git a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java index fd85d66..fabbe18 100644 --- a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java +++ b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java @@ -51,8 +51,6 @@ import org.apache.spark.sql.catalyst.expressions.Attribute; import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.DEFAULT_UPSERT_BATCH_SIZE; import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.UPSERT_BATCH_SIZE; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; public class PhoenixDataWriter implements DataWriter<InternalRow> { @@ -67,7 +65,7 @@ public class PhoenixDataWriter implements DataWriter<InternalRow> { PhoenixDataWriter(PhoenixDataSourceWriteOptions options) { String scn = options.getScn(); String tenantId = options.getTenantId(); - String zkUrl = options.getZkUrl(); + String jdbcUrl = options.getJdbcUrl(); Properties overridingProps = options.getOverriddenProps(); if (scn != null) { overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn); @@ -84,8 +82,7 @@ public class PhoenixDataWriter implements DataWriter<InternalRow> { } encoder = RowEncoder$.MODULE$.apply(schema).resolveAndBind( scala.collection.JavaConverters.asScalaIteratorConverter(attrs.iterator()).asScala().toSeq(), SimpleAnalyzer$.MODULE$); try { - this.conn = DriverManager.getConnection(JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, - overridingProps); + this.conn = DriverManager.getConnection(jdbcUrl, overridingProps); List<String> colNames = new ArrayList<>(Arrays.asList(options.getSchema().names())); if (!options.skipNormalizingIdentifier()){ colNames = colNames.stream().map(SchemaUtil::normalizeIdentifier).collect(Collectors.toList()); diff --git a/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java b/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java index dbcf45f..774454f 100644 --- a/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java +++ b/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java @@ -17,7 +17,11 @@ */ package org.apache.phoenix.spark; +import org.apache.hadoop.conf.Configuration; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.query.ConfigurationFactory; +import org.apache.phoenix.spark.sql.connector.PhoenixDataSource; +import org.apache.phoenix.util.InstanceResolver; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.*; @@ -25,13 +29,17 @@ import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; import java.sql.*; import java.util.Arrays; +import static org.apache.phoenix.spark.sql.connector.PhoenixDataSource.JDBC_URL; import static org.apache.phoenix.spark.sql.connector.PhoenixDataSource.ZOOKEEPER_URL; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; import static org.junit.Assert.*; public class DataSourceApiIT extends ParallelStatsDisabledIT { @@ -40,46 +48,114 @@ public class DataSourceApiIT extends ParallelStatsDisabledIT { super(); } + @BeforeClass + public static synchronized void setupInstanceResolver() { + //FIXME This should be called whenever we set up a miniCluster, deep in BaseTest + InstanceResolver.clearSingletons(); + // Make sure the ConnectionInfo in the tool doesn't try to pull a default Configuration + InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() { + @Override + public Configuration getConfiguration() { + return new Configuration(config); + } + + @Override + public Configuration getConfiguration(Configuration confToClone) { + Configuration copy = new Configuration(config); + copy.addResource(confToClone); + return copy; + } + }); + } + @Test - public void basicWriteTest() throws SQLException { + public void basicWriteAndReadBackTest() throws SQLException { SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); SQLContext sqlContext = new SQLContext(jsc); String tableName = generateUniqueName(); try (Connection conn = DriverManager.getConnection(getUrl()); - Statement stmt = conn.createStatement()){ - stmt.executeUpdate("CREATE TABLE " + tableName + " (id INTEGER PRIMARY KEY, v1 VARCHAR)"); + Statement stmt = conn.createStatement()) { + stmt.executeUpdate( + "CREATE TABLE " + tableName + " (id INTEGER PRIMARY KEY, v1 VARCHAR)"); } - try(SparkSession spark = sqlContext.sparkSession()) { - - StructType schema = new StructType(new StructField[]{ - new StructField("id", DataTypes.IntegerType, false, Metadata.empty()), - new StructField("v1", DataTypes.StringType, false, Metadata.empty()) - }); - - Dataset<Row> df = spark.createDataFrame( - Arrays.asList( - RowFactory.create(1, "x")), - schema); - - df.write() - .format("phoenix") - .mode(SaveMode.Append) - .option("table", tableName) - .option(ZOOKEEPER_URL, getUrl()) - .save(); + try (SparkSession spark = sqlContext.sparkSession()) { + + StructType schema = + new StructType(new StructField[] { + new StructField("id", DataTypes.IntegerType, false, Metadata.empty()), + new StructField("v1", DataTypes.StringType, false, Metadata.empty()) }); + + // Use old zkUrl + Dataset<Row> df1 = + spark.createDataFrame( + Arrays.asList(RowFactory.create(1, "x")), + schema); + + df1.write().format("phoenix").mode(SaveMode.Append) + .option("table", tableName) + .option(ZOOKEEPER_URL, getUrl()) + .save(); + + // Use jdbcUrl + Dataset<Row> df2 = + spark.createDataFrame( + Arrays.asList(RowFactory.create(2, "x")), + schema); + + df2.write().format("phoenix").mode(SaveMode.Append) + .option("table", tableName) + .option(JDBC_URL, JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + getUrl()) + .save(); + + // Use default from hbase-site.xml + Dataset<Row> df3 = + spark.createDataFrame( + Arrays.asList(RowFactory.create(3, "x")), + schema); + + df3.write().format("phoenix").mode(SaveMode.Append) + .option("table", tableName) + .save(); try (Connection conn = DriverManager.getConnection(getUrl()); - Statement stmt = conn.createStatement()) { + Statement stmt = conn.createStatement()) { ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName); assertTrue(rs.next()); assertEquals(1, rs.getInt(1)); assertEquals("x", rs.getString(2)); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals("x", rs.getString(2)); + assertTrue(rs.next()); + assertEquals(3, rs.getInt(1)); + assertEquals("x", rs.getString(2)); assertFalse(rs.next()); } + Dataset df1Read = spark.read().format("phoenix") + .option("table", tableName) + .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load(); + + assertEquals(3l, df1Read.count()); + + // Use jdbcUrl + Dataset df2Read = spark.read().format("phoenix") + .option("table", tableName) + .option(PhoenixDataSource.JDBC_URL, + JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + getUrl()) + .load(); + + assertEquals(3l, df2Read.count()); + + // Use default + Dataset df3Read = spark.read().format("phoenix") + .option("table", tableName) + .load(); + + assertEquals(3l, df3Read.count()); } finally { jsc.stop(); diff --git a/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/sql/connector/PhoenixTestingDataSource.java b/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/sql/connector/PhoenixTestingDataSource.java index cea6678..4860154 100644 --- a/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/sql/connector/PhoenixTestingDataSource.java +++ b/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/sql/connector/PhoenixTestingDataSource.java @@ -34,9 +34,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; - public class PhoenixTestingDataSource extends PhoenixDataSource { public static final String TEST_SOURCE = @@ -45,11 +42,10 @@ public class PhoenixTestingDataSource extends PhoenixDataSource { @Override public StructType inferSchema(CaseInsensitiveStringMap options) { String tableName = options.get("table"); - String zkUrl = options.get(ZOOKEEPER_URL); + String jdbcUrl = getJdbcUrlFromOptions(options); boolean dateAsTimestamp = Boolean.parseBoolean(options.getOrDefault("dateAsTimestamp", Boolean.toString(false))); Properties overriddenProps = extractPhoenixHBaseConfFromOptions(options); - try (Connection conn = DriverManager.getConnection( - JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) { + try (Connection conn = DriverManager.getConnection(jdbcUrl, overriddenProps)) { List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, null); Seq<ColumnInfo> columnInfoSeq = JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq(); schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp); diff --git a/phoenix5-spark3/README.md b/phoenix5-spark3/README.md index 40e769b..824886d 100644 --- a/phoenix5-spark3/README.md +++ b/phoenix5-spark3/README.md @@ -54,7 +54,7 @@ val spark = SparkSession val df = spark.sqlContext .read .format("phoenix") - .options(Map("table" -> "TABLE1", "zkUrl" -> "phoenix-server:2181")) + .options(Map("table" -> "TABLE1")) .load df.filter(df("COL1") === "test_row_1" && df("ID") === 1L) @@ -81,7 +81,6 @@ public class PhoenixSparkRead { .read() .format("phoenix") .option("table", "TABLE1") - .option("zkUrl", "phoenix-server:2181") .load(); df.createOrReplaceTempView("TABLE1"); @@ -126,14 +125,14 @@ val spark = SparkSession val df = spark.sqlContext .read .format("phoenix") - .options(Map("table" -> "INPUT_TABLE", "zkUrl" -> "phoenix-server:2181")) + .options(Map("table" -> "INPUT_TABLE")) .load // Save to OUTPUT_TABLE df.write .format("phoenix") .mode(SaveMode.Append) - .options(Map("table" -> "OUTPUT_TABLE", "zkUrl" -> "phoenix-server:2181")) + .options(Map("table" -> "OUTPUT_TABLE")) .save() ``` Java example: @@ -157,7 +156,6 @@ public class PhoenixSparkWriteFromInputTable { .read() .format("phoenix") .option("table", "INPUT_TABLE") - .option("zkUrl", "phoenix-server:2181") .load(); // Save to OUTPUT_TABLE @@ -165,7 +163,6 @@ public class PhoenixSparkWriteFromInputTable { .format("phoenix") .mode(SaveMode.Append) .option("table", "OUTPUT_TABLE") - .option("zkUrl", "phoenix-server:2181") .save(); jsc.stop(); } @@ -212,7 +209,7 @@ val df = spark.sqlContext.createDataFrame(rowRDD, schema) df.write .format("phoenix") - .options(Map("table" -> "OUTPUT_TABLE", "zkUrl" -> "phoenix-server:2181")) + .options(Map("table" -> "OUTPUT_TABLE")) .mode(SaveMode.Append) .save() ``` @@ -261,7 +258,6 @@ public class PhoenixSparkWriteFromRDDWithSchema { .format("phoenix") .mode(SaveMode.Append) .option("table", "OUTPUT_TABLE") - .option("zkUrl", "phoenix-server:2181") .save(); jsc.stop(); @@ -271,13 +267,21 @@ public class PhoenixSparkWriteFromRDDWithSchema { ## Notes -- If you want to use DataSourceV1, you can use source type `"org.apache.phoenix.spark"` - instead of `"phoenix"`, however this is deprecated as of `connectors-1.0.0`. -- The (deprecated) functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and `saveToPhoenix` all support +- The DataSourceV2 based "phoenix" data source accepts the `"jdbcUrl"` parameter, which can be +used to override the default Hbase/Phoenix instance specified in hbase-site.xml. It also accepts +the deprected `zkUrl` parameter for backwards compatibility purposes. If neither is specified, +it falls back to using connection defined by hbase-site.xml. +- `"jdbcUrl"` expects a full Phoenix JDBC URL, i.e. "jdbc:phoenix" or "jdbc:phoenix:zkHost:zkport", +while `"zkUrl"` expects the ZK quorum only, i.e. "zkHost:zkPort" +- If you want to use DataSourceV1, you can use source type `"org.apache.phoenix.spark"` +instead of `"phoenix"`, however this is deprecated as of `connectors-1.0.0`. +The `"org.apache.phoenix.spark"` datasource does not accept the `"jdbcUrl"` parameter, +only `"zkUrl"` +- The (deprecated) functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and +`saveToPhoenix` use the deprecated `"org.apache.phoenix.spark"` datasource, and allow optionally specifying a `conf` Hadoop configuration parameter with custom Phoenix client settings, -as well as an optional `zkUrl` parameter for the Phoenix connection URL. -- If `zkUrl` isn't specified, it's assumed that the "hbase.zookeeper.quorum" property has been set -in the `conf` parameter. Similarly, if no configuration is passed in, `zkUrl` must be specified. +as well as an optional `zkUrl` parameter. + - As of [PHOENIX-5197](https://issues.apache.org/jira/browse/PHOENIX-5197), you can pass configurations from the driver to executors as a comma-separated list against the key `phoenixConfigs` i.e (PhoenixDataSource.PHOENIX_CONFIGS), for ex: ```scala @@ -285,7 +289,7 @@ to executors as a comma-separated list against the key `phoenixConfigs` i.e (Pho .sqlContext .read .format("phoenix") - .options(Map("table" -> "Table1", "zkUrl" -> "phoenix-server:2181", "phoenixConfigs" -> "hbase.client.retries.number=10,hbase.client.pause=10000")) + .options(Map("table" -> "Table1", "jdbcUrl" -> "jdbc:phoenix:phoenix-server:2181", "phoenixConfigs" -> "hbase.client.retries.number=10,hbase.client.pause=10000")) .load; ``` This list of properties is parsed and populated into a properties map which is passed to `DriverManager.getConnection(connString, propsMap)`. @@ -299,12 +303,6 @@ to executors as a comma-separated list against the key `phoenixConfigs` i.e (Pho create the DataFrame or RDD directly if you need fine-grained configuration. - No support for aggregate or distinct functions (http://phoenix.apache.org/phoenix_mr.html) -## Limitations of the Spark3 connector comapred to the Spark2 Connector - -- Non-uppercase column names cannot be used for mapping DataFrames. (PHOENIX-6668) -- When writing to a DataFrame, every SQL column in the table must be specified. (PHOENIX-6667) - - ## Deprecated Usages ### Load as a DataFrame directly using a Configuration object diff --git a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSource.java b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSource.java index 4fc5ad4..c3c0ade 100644 --- a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSource.java +++ b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSource.java @@ -48,23 +48,20 @@ public class PhoenixDataSource implements TableProvider, DataSourceRegister { private static final Logger logger = LoggerFactory.getLogger(PhoenixDataSource.class); public static final String SKIP_NORMALIZING_IDENTIFIER = "skipNormalizingIdentifier"; + @Deprecated public static final String ZOOKEEPER_URL = "zkUrl"; + public static final String JDBC_URL = "jdbcUrl"; public static final String PHOENIX_CONFIGS = "phoenixconfigs"; protected StructType schema; - private CaseInsensitiveStringMap options; @Override public StructType inferSchema(CaseInsensitiveStringMap options){ if (options.get("table") == null) { throw new RuntimeException("No Phoenix option " + "Table" + " defined"); } - if (options.get(ZOOKEEPER_URL) == null) { - throw new RuntimeException("No Phoenix option " + ZOOKEEPER_URL + " defined"); - } - this.options = options; + String jdbcUrl = getJdbcUrlFromOptions(options); String tableName = options.get("table"); - String zkUrl = options.get(ZOOKEEPER_URL); String tenant = options.get(PhoenixRuntime.TENANT_ID_ATTRIB); boolean dateAsTimestamp = Boolean.parseBoolean(options.getOrDefault("dateAsTimestamp", Boolean.toString(false))); Properties overriddenProps = extractPhoenixHBaseConfFromOptions(options); @@ -75,8 +72,7 @@ public class PhoenixDataSource implements TableProvider, DataSourceRegister { /** * Sets the schema using all the table columns before any column pruning has been done */ - try (Connection conn = DriverManager.getConnection( - JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) { + try (Connection conn = DriverManager.getConnection(jdbcUrl, overriddenProps)) { List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, null); Seq<ColumnInfo> columnInfoSeq = JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq(); schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp); @@ -87,6 +83,32 @@ public class PhoenixDataSource implements TableProvider, DataSourceRegister { return schema; } + public static String getJdbcUrlFromOptions(Map<String, String> options) { + if (options.get(JDBC_URL) != null && options.get(ZOOKEEPER_URL) != null) { + throw new RuntimeException("If " + JDBC_URL + " is specified, then " + ZOOKEEPER_URL + + " must not be specified"); + } + + String jdbcUrl = options.get(JDBC_URL); + String zkUrl = options.get(ZOOKEEPER_URL); + // Backward compatibility logic + if (jdbcUrl == null) { + if (zkUrl != null) { + if (zkUrl.startsWith(JDBC_PROTOCOL)) { + // full URL specified, use it + jdbcUrl = zkUrl; + } else { + // backwards compatibility, assume ZK, and missing protocol + // Don't use the specific protocol, as we need to work with older releases. + jdbcUrl = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl; + } + } else { + jdbcUrl = JDBC_PROTOCOL; + } + } + return jdbcUrl; + } + @Override public Table getTable( StructType schema, Transform[] transforms, Map<String, String> properties) { diff --git a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixDataSourceReadOptions.java b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixDataSourceReadOptions.java index 4d49150..050663d 100644 --- a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixDataSourceReadOptions.java +++ b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixDataSourceReadOptions.java @@ -25,19 +25,19 @@ import java.util.Properties; class PhoenixDataSourceReadOptions implements Serializable { private final String tenantId; - private final String zkUrl; + private final String jdbcUrl; private final String scn; private final String selectStatement; private final Properties overriddenProps; private final byte[] pTableCacheBytes; - PhoenixDataSourceReadOptions(String zkUrl, String scn, String tenantId, + PhoenixDataSourceReadOptions(String jdbcUrl, String scn, String tenantId, String selectStatement, Properties overriddenProps, byte[] pTableCacheBytes) { - if(overriddenProps == null){ + if (overriddenProps == null){ throw new NullPointerException(); } - this.zkUrl = zkUrl; + this.jdbcUrl = jdbcUrl; this.scn = scn; this.tenantId = tenantId; this.selectStatement = selectStatement; @@ -53,8 +53,8 @@ class PhoenixDataSourceReadOptions implements Serializable { return scn; } - String getZkUrl() { - return zkUrl; + String getJdbcUrl() { + return jdbcUrl; } String getTenantId() { diff --git a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixPartitionReader.java b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixPartitionReader.java index 8a34cae..6a7ee4d 100644 --- a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixPartitionReader.java +++ b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixPartitionReader.java @@ -57,9 +57,6 @@ import org.apache.spark.sql.execution.datasources.SparkJdbcUtil; import org.apache.spark.sql.types.StructType; import scala.collection.Iterator; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; - public class PhoenixPartitionReader implements PartitionReader<InternalRow> { private final PhoenixInputPartition inputPartition; @@ -81,11 +78,10 @@ public class PhoenixPartitionReader implements PartitionReader<InternalRow> { } private QueryPlan getQueryPlan() throws SQLException { - String zkUrl = options.getZkUrl(); + String jdbcUrl = options.getJdbcUrl(); Properties overridingProps = getOverriddenPropsFromOptions(); overridingProps.put("phoenix.skip.system.tables.existence.check", Boolean.valueOf("true")); - try (Connection conn = DriverManager.getConnection( - JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overridingProps)) { + try (Connection conn = DriverManager.getConnection(jdbcUrl, overridingProps)) { PTable pTable = null; try { pTable = PTable.parseFrom(options.getPTableCacheBytes()); @@ -172,7 +168,7 @@ public class PhoenixPartitionReader implements PartitionReader<InternalRow> { @Override public void close() throws IOException { - if(resultSet != null) { + if (resultSet != null) { try { resultSet.close(); } catch (SQLException e) { diff --git a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixScan.java b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixScan.java index 4e6af60..c72206e 100644 --- a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixScan.java +++ b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixScan.java @@ -51,14 +51,12 @@ import java.util.List; import java.util.Properties; import static org.apache.phoenix.spark.sql.connector.PhoenixDataSource.extractPhoenixHBaseConfFromOptions; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; public class PhoenixScan implements Scan, Batch { private final StructType schema; private final CaseInsensitiveStringMap options; - private final String zkUrl; + private final String jdbcUrl; private final Properties overriddenProps; private PhoenixDataSourceReadOptions phoenixDataSourceOptions; private final String tableName; @@ -72,8 +70,8 @@ public class PhoenixScan implements Scan, Batch { this.options = options; this.whereClause = whereClause; this.overriddenProps = extractPhoenixHBaseConfFromOptions(options); - this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL); - tableName = options.get("table"); + this.jdbcUrl = PhoenixDataSource.getJdbcUrlFromOptions(options); + this.tableName = options.get("table"); } private void populateOverriddenProperties(){ @@ -82,7 +80,7 @@ public class PhoenixScan implements Scan, Batch { // Generate splits based off statistics, or just region splits? splitByStats = options.getBoolean( PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS); - if(currentScnValue != null) { + if (currentScnValue != null) { overriddenProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue); } if (tenantId != null){ @@ -109,7 +107,7 @@ public class PhoenixScan implements Scan, Batch { public InputPartition[] planInputPartitions() { populateOverriddenProperties(); try (Connection conn = DriverManager.getConnection( - JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) { + jdbcUrl, overriddenProps)) { List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, new ArrayList<>( Arrays.asList(schema.names()))); final Statement statement = conn.createStatement(); @@ -151,7 +149,7 @@ public class PhoenixScan implements Scan, Batch { byte[] pTableCacheBytes = PTableImpl.toProto(queryPlan.getTableRef().getTable()). toByteArray(); phoenixDataSourceOptions = - new PhoenixDataSourceReadOptions(zkUrl, currentScnValue, + new PhoenixDataSourceReadOptions(jdbcUrl, currentScnValue, tenantId, selectStatement, overriddenProps, pTableCacheBytes); if (splitByStats) { diff --git a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/writer/PhoenixBatchWrite.java b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/writer/PhoenixBatchWrite.java index b666bf1..5115f73 100644 --- a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/writer/PhoenixBatchWrite.java +++ b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/writer/PhoenixBatchWrite.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.spark.sql.connector.writer; +import org.apache.phoenix.spark.sql.connector.PhoenixDataSource; import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.spark.sql.connector.write.BatchWrite; @@ -29,7 +30,6 @@ import org.apache.spark.sql.types.StructType; import java.util.Map; import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.CURRENT_SCN_VALUE; -import static org.apache.phoenix.spark.sql.connector.PhoenixDataSource.ZOOKEEPER_URL; import static org.apache.phoenix.spark.sql.connector.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER; import static org.apache.phoenix.spark.sql.connector.PhoenixDataSource.extractPhoenixHBaseConfFromOptions; @@ -60,12 +60,12 @@ public class PhoenixBatchWrite implements BatchWrite { StructType schema) { String scn = options.get(CURRENT_SCN_VALUE); String tenantId = options.get(PhoenixRuntime.TENANT_ID_ATTRIB); - String zkUrl = options.get(ZOOKEEPER_URL); + String jdbcUrl = PhoenixDataSource.getJdbcUrlFromOptions(options); String tableName = options.get("table"); boolean skipNormalizingIdentifier = Boolean.parseBoolean(options.getOrDefault(SKIP_NORMALIZING_IDENTIFIER, Boolean.toString(false))); return new PhoenixDataSourceWriteOptions.Builder() .setTableName(tableName) - .setZkUrl(zkUrl) + .setJdbcUrl(jdbcUrl) .setScn(scn) .setTenantId(tenantId) .setSchema(schema) diff --git a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/writer/PhoenixDataSourceWriteOptions.java b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/writer/PhoenixDataSourceWriteOptions.java index 03baae1..adc8f37 100644 --- a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/writer/PhoenixDataSourceWriteOptions.java +++ b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/writer/PhoenixDataSourceWriteOptions.java @@ -28,30 +28,30 @@ import java.util.Properties; class PhoenixDataSourceWriteOptions implements Serializable { private final String tableName; - private final String zkUrl; + private final String jdbcUrl; private final String tenantId; private final String scn; private final StructType schema; private final boolean skipNormalizingIdentifier; private final Properties overriddenProps; - private PhoenixDataSourceWriteOptions(String tableName, String zkUrl, String scn, + private PhoenixDataSourceWriteOptions(String tableName, String jdbcUrl, String scn, String tenantId, StructType schema, boolean skipNormalizingIdentifier, Properties overriddenProps) { if (tableName == null) { - throw new NullPointerException(); + throw new IllegalArgumentException("tableName must not be null"); } - if (zkUrl == null) { - throw new NullPointerException(); + if (jdbcUrl == null) { + throw new IllegalArgumentException("jdbcUrl must not be null"); } if (schema == null) { - throw new NullPointerException(); + throw new IllegalArgumentException("schema must not be null"); } if (overriddenProps == null) { - throw new NullPointerException(); + throw new IllegalArgumentException("overriddenProps must not be null"); } this.tableName = tableName; - this.zkUrl = zkUrl; + this.jdbcUrl = jdbcUrl; this.scn = scn; this.tenantId = tenantId; this.schema = schema; @@ -63,8 +63,8 @@ class PhoenixDataSourceWriteOptions implements Serializable { return scn; } - String getZkUrl() { - return zkUrl; + String getJdbcUrl() { + return jdbcUrl; } String getTenantId() { @@ -97,7 +97,7 @@ class PhoenixDataSourceWriteOptions implements Serializable { static class Builder { private String tableName; - private String zkUrl; + private String jdbcUrl; private String scn; private String tenantId; private StructType schema; @@ -109,8 +109,8 @@ class PhoenixDataSourceWriteOptions implements Serializable { return this; } - Builder setZkUrl(String zkUrl) { - this.zkUrl = zkUrl; + Builder setJdbcUrl(String jdbcUrl) { + this.jdbcUrl = jdbcUrl; return this; } @@ -140,7 +140,7 @@ class PhoenixDataSourceWriteOptions implements Serializable { } PhoenixDataSourceWriteOptions build() { - return new PhoenixDataSourceWriteOptions(tableName, zkUrl, scn, tenantId, schema, + return new PhoenixDataSourceWriteOptions(tableName, jdbcUrl, scn, tenantId, schema, skipNormalizingIdentifier, overriddenProps); } } diff --git a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/writer/PhoenixDataWriter.java b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/writer/PhoenixDataWriter.java index 33fe0f4..ea5338f 100644 --- a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/writer/PhoenixDataWriter.java +++ b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/writer/PhoenixDataWriter.java @@ -50,8 +50,6 @@ import org.apache.spark.sql.catalyst.expressions.Attribute; import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.DEFAULT_UPSERT_BATCH_SIZE; import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.UPSERT_BATCH_SIZE; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; public class PhoenixDataWriter implements DataWriter<InternalRow> { @@ -64,7 +62,7 @@ public class PhoenixDataWriter implements DataWriter<InternalRow> { private ExpressionEncoder<Row> encoder; PhoenixDataWriter(StructType schema, PhoenixDataSourceWriteOptions options) { - String zkUrl = options.getZkUrl(); + String jdbcUrl = options.getJdbcUrl(); Properties connectionProps = options.getEffectiveProps(); this.schema = options.getSchema(); @@ -74,8 +72,7 @@ public class PhoenixDataWriter implements DataWriter<InternalRow> { } encoder = RowEncoder$.MODULE$.apply(schema).resolveAndBind( scala.collection.JavaConverters.asScalaIteratorConverter(attrs.iterator()).asScala().toSeq(), SimpleAnalyzer$.MODULE$); try { - this.conn = DriverManager.getConnection(JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, - connectionProps); + this.conn = DriverManager.getConnection(jdbcUrl, connectionProps); List<String> colNames = new ArrayList<>(Arrays.asList(options.getSchema().names())); if (!options.skipNormalizingIdentifier()){ colNames = colNames.stream().map(SchemaUtil::normalizeIdentifier).collect(Collectors.toList()); diff --git a/phoenix5-spark3/src/test/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSourceTest.java b/phoenix5-spark3/src/test/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSourceTest.java index 9a06236..ae7e2bd 100644 --- a/phoenix5-spark3/src/test/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSourceTest.java +++ b/phoenix5-spark3/src/test/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSourceTest.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.spark.sql.connector; +import org.apache.phoenix.util.PhoenixRuntime; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.junit.Test; @@ -82,4 +83,30 @@ public class PhoenixDataSourceTest { assertTrue(extractPhoenixHBaseConfFromOptions(null).isEmpty()); } + @Test + public void testUrlFallbackLogic() { + Map<String, String> props = new HashMap<>(); + + assertEquals(PhoenixRuntime.JDBC_PROTOCOL, PhoenixDataSource.getJdbcUrlFromOptions(props)); + + // The fallback logic doesn't attempt to check the URL, except for the presence of + // the "jdbc:phoenix" prefix + String NOTANURL = "notanurl"; + + props.put(PhoenixDataSource.JDBC_URL, NOTANURL); + assertEquals(NOTANURL, PhoenixDataSource.getJdbcUrlFromOptions(props)); + + props.remove(PhoenixDataSource.JDBC_URL); + props.put(PhoenixDataSource.ZOOKEEPER_URL, NOTANURL); + assertEquals( + PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + NOTANURL, + PhoenixDataSource.getJdbcUrlFromOptions(props)); + + props.put(PhoenixDataSource.ZOOKEEPER_URL, + PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + NOTANURL); + assertEquals( + PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + NOTANURL, + PhoenixDataSource.getJdbcUrlFromOptions(props)); + } + } diff --git a/pom.xml b/pom.xml index 1f28456..b4c4cb8 100644 --- a/pom.xml +++ b/pom.xml @@ -83,37 +83,16 @@ <!-- Profiling is not enabled in the repo. Placeholder. --> <jacocoArgLine></jacocoArgLine> - <phoenix-surefire.argLine>-enableassertions -Xmx${surefire.Xmx} - -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true - -Djava.awt.headless=true -Djdk.net.URLClassPath.disableClassPathURLCheck=true - -Dorg.apache.hbase.thirdparty.io.netty.leakDetection.level=advanced - -Dio.netty.eventLoopThreads=3 -Duser.timezone="America/Los_Angeles" - -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=./target/ - "-Djava.library.path=${hadoop.library.path}${path.separator}${java.library.path}" - </phoenix-surefire.argLine> - <phoenix-surefire.jdk8.tuning.flags> - -XX:NewRatio=4 -XX:SurvivorRatio=8 -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC - -XX:+DisableExplicitGC -XX:+UseCMSInitiatingOccupancyOnly -XX:+CMSClassUnloadingEnabled - -XX:+CMSScavengeBeforeRemark -XX:CMSInitiatingOccupancyFraction=68 - </phoenix-surefire.jdk8.tuning.flags> - <phoenix-surefire.jdk11.flags>-Dorg.apache.hbase.thirdparty.io.netty.tryReflectionSetAccessible=true - --add-modules jdk.unsupported - --add-opens java.base/java.nio=ALL-UNNAMED - --add-opens java.base/sun.nio.ch=ALL-UNNAMED - --add-opens java.base/java.lang=ALL-UNNAMED - --add-opens java.base/jdk.internal.ref=ALL-UNNAMED - --add-opens java.base/java.lang.reflect=ALL-UNNAMED - --add-opens java.base/java.util=ALL-UNNAMED - --add-opens java.base/java.util.concurrent=ALL-UNNAMED - --add-exports java.base/jdk.internal.misc=ALL-UNNAMED - --add-exports java.security.jgss/sun.security.krb5=ALL-UNNAMED</phoenix-surefire.jdk11.flags> - <phoenix-surefire.jdk11.tuning.flags> - ${phoenix-surefire.jdk8.tuning.flags} - </phoenix-surefire.jdk11.tuning.flags> + <!-- Hard to read, but ScalaTest cannot handle multiline argLine. + It cannot resolve @{} either--> + <phoenix-surefire.argLine>-enableassertions -Xmx${surefire.Xmx} -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true -Djava.awt.headless=true -Djdk.net.URLClassPath.disableClassPathURLCheck=true -Dorg.apache.hbase.thirdparty.io.netty.leakDetection.level=advanced -Dio.netty.eventLoopThreads=3 -Duser.timezone="America/Los_Angeles" -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=./target/ "-Djava.library.path=${hadoop.library.path}${path.separator}${java.library.path [...] + <phoenix-surefire.jdk8.tuning.flags>-XX:NewRatio=4 -XX:SurvivorRatio=8 -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+DisableExplicitGC -XX:+UseCMSInitiatingOccupancyOnly -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:CMSInitiatingOccupancyFraction=68</phoenix-surefire.jdk8.tuning.flags> + <phoenix-surefire.jdk11.flags>-Dorg.apache.hbase.thirdparty.io.netty.tryReflectionSetAccessible=true --add-modules jdk.unsupported --add-opens java.base/java.nio=ALL-UNNAMED --add-opens java.base/sun.nio.ch=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/jdk.internal.ref=ALL-UNNAMED --add-opens java.base/java.lang.reflect=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.util.concurrent=ALL-UNNAMED --add-exports java.base [...] + <phoenix-surefire.jdk11.tuning.flags>${phoenix-surefire.jdk8.tuning.flags}</phoenix-surefire.jdk11.tuning.flags> <phoenix-surefire.jdk17.flags>--add-opens java.base/jdk.internal.util.random=ALL-UNNAMED</phoenix-surefire.jdk17.flags> <phoenix-surefire.jdk17.tuning.flags></phoenix-surefire.jdk17.tuning.flags> <!-- Surefire argLine defaults for Linux + JDK8 --> - <argLine>${phoenix-surefire.argLine} ${phoenix-surefire.jdk8.tuning.flags} @{jacocoArgLine}</argLine> + <argLine>${phoenix-surefire.argLine} ${phoenix-surefire.jdk8.tuning.flags}</argLine> <!-- Dependency versions --> <hive3.version>3.1.2</hive3.version>