This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git


The following commit(s) were added to refs/heads/master by this push:
     new 5aedf30  PHOENIX-6633 Use standard JDBC URL in Spark Connector and 
make it optional
5aedf30 is described below

commit 5aedf308ffcaf76ba85bfc2efdb55a8fb72c991e
Author: Istvan Toth <st...@apache.org>
AuthorDate: Mon Sep 25 08:16:06 2023 +0200

    PHOENIX-6633 Use standard JDBC URL in Spark Connector and make it optional
---
 phoenix5-spark/README.md                           |  46 ++++----
 phoenix5-spark/pom.xml                             |  21 +++-
 .../org/apache/phoenix/spark/DataSourceApiIT.java  | 124 +++++++++++++++++----
 .../spark/datasource/v2/PhoenixDataSource.java     |  32 ++++++
 .../v2/reader/PhoenixDataSourceReadOptions.java    |  12 +-
 .../v2/reader/PhoenixDataSourceReader.java         |  24 ++--
 .../v2/reader/PhoenixInputPartitionReader.java     |  10 +-
 .../v2/writer/PhoenixDataSourceWriteOptions.java   |  28 ++---
 .../v2/writer/PhoenixDataSourceWriter.java         |  12 +-
 .../datasource/v2/writer/PhoenixDataWriter.java    |   7 +-
 .../org/apache/phoenix/spark/DataSourceApiIT.java  | 120 ++++++++++++++++----
 .../sql/connector/PhoenixTestingDataSource.java    |   8 +-
 phoenix5-spark3/README.md                          |  40 ++++---
 .../spark/sql/connector/PhoenixDataSource.java     |  38 +++++--
 .../reader/PhoenixDataSourceReadOptions.java       |  12 +-
 .../connector/reader/PhoenixPartitionReader.java   |  10 +-
 .../spark/sql/connector/reader/PhoenixScan.java    |  14 +--
 .../sql/connector/writer/PhoenixBatchWrite.java    |   6 +-
 .../writer/PhoenixDataSourceWriteOptions.java      |  28 ++---
 .../sql/connector/writer/PhoenixDataWriter.java    |   7 +-
 .../spark/sql/connector/PhoenixDataSourceTest.java |  27 +++++
 pom.xml                                            |  35 ++----
 22 files changed, 427 insertions(+), 234 deletions(-)

diff --git a/phoenix5-spark/README.md b/phoenix5-spark/README.md
index 2540b73..f443cb0 100644
--- a/phoenix5-spark/README.md
+++ b/phoenix5-spark/README.md
@@ -33,7 +33,6 @@ Scala example:
 ```scala
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.{SQLContext, SparkSession}
-import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource
 
 val spark = SparkSession
   .builder()
@@ -45,7 +44,7 @@ val spark = SparkSession
 val df = spark.sqlContext
   .read
   .format("phoenix")
-  .options(Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> 
"phoenix-server:2181"))
+  .options(Map("table" -> "TABLE1"))
   .load
 
 df.filter(df("COL1") === "test_row_1" && df("ID") === 1L)
@@ -60,8 +59,6 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 
-import static 
org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
-
 public class PhoenixSparkRead {
     
     public static void main() throws Exception {
@@ -74,7 +71,6 @@ public class PhoenixSparkRead {
             .read()
             .format("phoenix")
             .option("table", "TABLE1")
-            .option(ZOOKEEPER_URL, "phoenix-server:2181")
             .load();
         df.createOrReplaceTempView("TABLE1");
     
@@ -108,7 +104,6 @@ you can load from an input table and save to an output 
table as a DataFrame as f
 ```scala
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.{SQLContext, SparkSession, SaveMode}
-import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource
 
 val spark = SparkSession
   .builder()
@@ -120,14 +115,14 @@ val spark = SparkSession
 val df = spark.sqlContext
   .read
   .format("phoenix")
-  .options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> 
"phoenix-server:2181"))
+  .options(Map("table" -> "INPUT_TABLE"))
   .load
 
 // Save to OUTPUT_TABLE
 df.write
   .format("phoenix")
   .mode(SaveMode.Overwrite)
-  .options(Map("table" -> "OUTPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> 
"phoenix-server:2181"))
+  .options(Map("table" -> "OUTPUT_TABLE"))
   .save()
 ```
 Java example:
@@ -139,8 +134,6 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.SQLContext;
 
-import static 
org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
-
 public class PhoenixSparkWriteFromInputTable {
     
     public static void main() throws Exception {
@@ -153,7 +146,6 @@ public class PhoenixSparkWriteFromInputTable {
             .read()
             .format("phoenix")
             .option("table", "INPUT_TABLE")
-            .option(ZOOKEEPER_URL, "phoenix-server:2181")
             .load();
         
         // Save to OUTPUT_TABLE
@@ -161,7 +153,6 @@ public class PhoenixSparkWriteFromInputTable {
           .format("phoenix")
           .mode(SaveMode.Overwrite)
           .option("table", "OUTPUT_TABLE")
-          .option(ZOOKEEPER_URL, "phoenix-server:2181")
           .save();
         jsc.stop();
     }
@@ -187,7 +178,6 @@ you can save a dataframe from an RDD as follows in Scala:
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructType, StructField}
 import org.apache.spark.sql.{Row, SQLContext, SparkSession, SaveMode}
-import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource
 
 val spark = SparkSession
   .builder()
@@ -209,7 +199,7 @@ val df = spark.sqlContext.createDataFrame(rowRDD, schema)
 
 df.write
   .format("phoenix")
-  .options(Map("table" -> "OUTPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> 
"phoenix-server:2181"))
+  .options(Map("table" -> "OUTPUT_TABLE"))
   .mode(SaveMode.Overwrite)
   .save()
 ```
@@ -230,8 +220,6 @@ import org.apache.spark.sql.types.StructType;
 import java.util.ArrayList;
 import java.util.List;
 
-import static 
org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
-
 public class PhoenixSparkWriteFromRDDWithSchema {
  
     public static void main() throws Exception {
@@ -260,7 +248,6 @@ public class PhoenixSparkWriteFromRDDWithSchema {
             .format("phoenix")
             .mode(SaveMode.Overwrite)
             .option("table", "OUTPUT_TABLE")
-            .option(ZOOKEEPER_URL,  "phoenix-server:2181")
             .save();
   
         jsc.stop();
@@ -270,13 +257,21 @@ public class PhoenixSparkWriteFromRDDWithSchema {
 
 ## Notes
 
-- If you want to use DataSourceV1, you can use source type 
`"org.apache.phoenix.spark"` 
-  instead of `"phoenix"`, however this is deprecated as of `connectors-1.0.0`.
-- The (deprecated) functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` 
and `saveToPhoenix` all support
+- The DataSourceV2 based "phoenix" data source accepts the `"jdbcUrl"` 
parameter, which can be
+used to override the default Hbase/Phoenix instance specified in 
hbase-site.xml. It also accepts
+the deprected `zkUrl` parameter for backwards compatibility purposes. If 
neither is specified,
+it falls back to using connection defined by hbase-site.xml.
+- `"jdbcUrl"` expects a full Phoenix JDBC URL, i.e. "jdbc:phoenix" or 
"jdbc:phoenix:zkHost:zkport",
+while `"zkUrl"` expects the ZK quorum only, i.e. "zkHost:zkPort"
+- If you want to use DataSourceV1, you can use source type 
`"org.apache.phoenix.spark"`
+instead of `"phoenix"`, however this is deprecated as of `connectors-1.0.0`.
+The `"org.apache.phoenix.spark"` datasource does not accept the `"jdbcUrl"` 
parameter,
+only `"zkUrl"`
+- The (deprecated) functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and
+`saveToPhoenix` use the deprecated `"org.apache.phoenix.spark"` datasource, 
and allow
 optionally specifying a `conf` Hadoop configuration parameter with custom 
Phoenix client settings,
-as well as an optional `zkUrl` parameter for the Phoenix connection URL.
-- If `zkUrl` isn't specified, it's assumed that the "hbase.zookeeper.quorum" 
property has been set
-in the `conf` parameter. Similarly, if no configuration is passed in, `zkUrl` 
must be specified.
+as well as an optional `zkUrl` parameter.
+
 - As of [PHOENIX-5197](https://issues.apache.org/jira/browse/PHOENIX-5197), 
you can pass configurations from the driver
 to executors as a comma-separated list against the key `phoenixConfigs` i.e 
(PhoenixDataSource.PHOENIX_CONFIGS), for ex:
     ```scala
@@ -284,10 +279,11 @@ to executors as a comma-separated list against the key 
`phoenixConfigs` i.e (Pho
       .sqlContext
       .read
       .format("phoenix")
-      .options(Map("table" -> "Table1", "zkUrl" -> "phoenix-server:2181", 
"phoenixConfigs" -> "hbase.client.retries.number=10,hbase.client.pause=10000"))
+      .options(Map("table" -> "Table1", "jdbcUrl" -> 
"jdbc:phoenix:phoenix-server:2181", "phoenixConfigs" -> 
"hbase.client.retries.number=10,hbase.client.pause=10000"))
       .load;
     ```
-    This list of properties is parsed and populated into a properties map 
which is passed to `DriverManager.getConnection(connString, propsMap)`.
+    This list of properties is parsed and populated into a properties map 
which is passed to 
+    `DriverManager.getConnection(connString, propsMap)`.
     Note that the same property values will be used for both the driver and 
all executors and
     these configurations are used each time a connection is made (both on the 
driver and executors).
 
diff --git a/phoenix5-spark/pom.xml b/phoenix5-spark/pom.xml
index 9cd8ff3..35b1d83 100644
--- a/phoenix5-spark/pom.xml
+++ b/phoenix5-spark/pom.xml
@@ -446,6 +446,25 @@
 
   <build>
       <plugins>
+        <plugin>
+          <groupId>org.codehaus.mojo</groupId>
+          <artifactId>build-helper-maven-plugin</artifactId>
+          <executions>
+            <execution>
+              <id>add-test-source</id>
+              <phase>generate-sources</phase>
+              <goals>
+                <goal>add-test-source</goal>
+              </goals>
+              <configuration>
+                <sources>
+                  <source>src/it/java</source>
+                  <source>src/it/scala</source>
+                </sources>
+              </configuration>
+            </execution>
+          </executions>
+        </plugin>
         <plugin>
           <artifactId>maven-dependency-plugin</artifactId>
           <configuration>
@@ -513,7 +532,7 @@
                   to https://github.com/junit-team/junit4/issues/1223 -->
                 <parallel>false</parallel>
                 <tagsToExclude>Integration-Test</tagsToExclude>
-                <argLine>--XX:ReservedCodeCacheSize=512m ${argLine}</argLine>
+                <argLine>-XX:ReservedCodeCacheSize=512m ${argLine}</argLine>
               </configuration>
             </execution>
           </executions>
diff --git 
a/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java 
b/phoenix5-spark/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java
similarity index 50%
copy from 
phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java
copy to phoenix5-spark/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java
index dbcf45f..c6a4465 100644
--- 
a/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java
+++ b/phoenix5-spark/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java
@@ -17,7 +17,11 @@
  */
 package org.apache.phoenix.spark;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource;
+import org.apache.phoenix.util.InstanceResolver;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.*;
@@ -25,13 +29,17 @@ import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
+import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
 import java.sql.*;
 import java.util.Arrays;
 
-import static 
org.apache.phoenix.spark.sql.connector.PhoenixDataSource.ZOOKEEPER_URL;
+import static 
org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.JDBC_URL;
+import static 
org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 import static org.junit.Assert.*;
 
 public class DataSourceApiIT extends ParallelStatsDisabledIT {
@@ -40,46 +48,114 @@ public class DataSourceApiIT extends 
ParallelStatsDisabledIT {
         super();
     }
 
+    @BeforeClass
+    public static synchronized void setupInstanceResolver() {
+        //FIXME This should be called whenever we set up a miniCluster, deep 
in BaseTest
+        InstanceResolver.clearSingletons();
+        // Make sure the ConnectionInfo in the tool doesn't try to pull a 
default Configuration
+        InstanceResolver.getSingleton(ConfigurationFactory.class, new 
ConfigurationFactory() {
+            @Override
+            public Configuration getConfiguration() {
+                return new Configuration(config);
+            }
+
+            @Override
+            public Configuration getConfiguration(Configuration confToClone) {
+                Configuration copy = new Configuration(config);
+                copy.addResource(confToClone);
+                return copy;
+            }
+        });
+    }
+
     @Test
-    public void basicWriteTest() throws SQLException {
+    public void basicWriteAndReadBackTest() throws SQLException {
         SparkConf sparkConf = new 
SparkConf().setMaster("local").setAppName("phoenix-test");
         JavaSparkContext jsc = new JavaSparkContext(sparkConf);
         SQLContext sqlContext = new SQLContext(jsc);
         String tableName = generateUniqueName();
 
         try (Connection conn = DriverManager.getConnection(getUrl());
-             Statement stmt = conn.createStatement()){
-            stmt.executeUpdate("CREATE TABLE " + tableName + " (id INTEGER 
PRIMARY KEY, v1 VARCHAR)");
+                Statement stmt = conn.createStatement()) {
+            stmt.executeUpdate(
+                "CREATE TABLE " + tableName + " (id INTEGER PRIMARY KEY, v1 
VARCHAR)");
         }
 
-        try(SparkSession spark = sqlContext.sparkSession()) {
-
-            StructType schema = new StructType(new StructField[]{
-                    new StructField("id", DataTypes.IntegerType, false, 
Metadata.empty()),
-                    new StructField("v1", DataTypes.StringType, false, 
Metadata.empty())
-            });
-
-            Dataset<Row> df = spark.createDataFrame(
-                    Arrays.asList(
-                            RowFactory.create(1, "x")),
-                    schema);
-
-            df.write()
-                    .format("phoenix")
-                    .mode(SaveMode.Append)
-                    .option("table", tableName)
-                    .option(ZOOKEEPER_URL, getUrl())
-                    .save();
+        try (SparkSession spark = sqlContext.sparkSession()) {
+
+            StructType schema =
+                    new StructType(new StructField[] {
+                            new StructField("id", DataTypes.IntegerType, 
false, Metadata.empty()),
+                            new StructField("v1", DataTypes.StringType, false, 
Metadata.empty()) });
+
+            // Use old zkUrl
+            Dataset<Row> df1 =
+                    spark.createDataFrame(
+                        Arrays.asList(RowFactory.create(1, "x")),
+                        schema);
+
+            df1.write().format("phoenix").mode(SaveMode.Overwrite)
+            .option("table", tableName)
+            .option(ZOOKEEPER_URL, getUrl())
+            .save();
+
+            // Use jdbcUrl
+            Dataset<Row> df2 =
+                    spark.createDataFrame(
+                        Arrays.asList(RowFactory.create(2, "x")),
+                        schema);
+
+            df2.write().format("phoenix").mode(SaveMode.Overwrite)
+                .option("table", tableName)
+                .option(JDBC_URL, JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + 
getUrl())
+                .save();
+
+            // Use default from hbase-site.xml
+            Dataset<Row> df3 =
+                    spark.createDataFrame(
+                        Arrays.asList(RowFactory.create(3, "x")),
+                        schema);
+
+            df3.write().format("phoenix").mode(SaveMode.Overwrite)
+                .option("table", tableName)
+                .save();
 
             try (Connection conn = DriverManager.getConnection(getUrl());
-                 Statement stmt = conn.createStatement()) {
+                    Statement stmt = conn.createStatement()) {
                 ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
                 assertTrue(rs.next());
                 assertEquals(1, rs.getInt(1));
                 assertEquals("x", rs.getString(2));
+                assertTrue(rs.next());
+                assertEquals(2, rs.getInt(1));
+                assertEquals("x", rs.getString(2));
+                assertTrue(rs.next());
+                assertEquals(3, rs.getInt(1));
+                assertEquals("x", rs.getString(2));
                 assertFalse(rs.next());
             }
 
+            Dataset df1Read = spark.read().format("phoenix")
+                    .option("table", tableName)
+                    .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+
+            assertEquals(3l, df1Read.count());
+
+            // Use jdbcUrl
+            Dataset df2Read = spark.read().format("phoenix")
+                    .option("table", tableName)
+                    .option(PhoenixDataSource.JDBC_URL,
+                                JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + 
getUrl())
+                    .load();
+
+            assertEquals(3l, df2Read.count());
+
+            // Use default
+            Dataset df3Read = spark.read().format("phoenix")
+                    .option("table", tableName)
+                    .load();
+
+            assertEquals(3l, df3Read.count());
 
         } finally {
             jsc.stop();
@@ -116,7 +192,7 @@ public class DataSourceApiIT extends 
ParallelStatsDisabledIT {
 
             df.write()
                     .format("phoenix")
-                    .mode(SaveMode.Append)
+                    .mode(SaveMode.Overwrite)
                     .option("table", tableName)
                     .option(ZOOKEEPER_URL, getUrl())
                     .save();
diff --git 
a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
 
b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
index 792e8f8..de773d5 100644
--- 
a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
+++ 
b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
@@ -34,6 +34,9 @@ import 
org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
 import org.apache.spark.sql.types.StructType;
 
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+
 /**
  * Implements the DataSourceV2 api to read and write from Phoenix tables
  */
@@ -41,7 +44,9 @@ public class PhoenixDataSource  implements DataSourceV2,  
ReadSupport, WriteSupp
 
     private static final Logger logger = 
LoggerFactory.getLogger(PhoenixDataSource.class);
     public static final String SKIP_NORMALIZING_IDENTIFIER = 
"skipNormalizingIdentifier";
+    @Deprecated
     public static final String ZOOKEEPER_URL = "zkUrl";
+    public static final String JDBC_URL = "jdbcUrl";
     public static final String PHOENIX_CONFIGS = "phoenixconfigs";
 
     @Override
@@ -55,6 +60,33 @@ public class PhoenixDataSource  implements DataSourceV2,  
ReadSupport, WriteSupp
         return Optional.of(new PhoenixDataSourceWriter(mode, schema, options));
     }
 
+    public static String getJdbcUrlFromOptions(final DataSourceOptions 
options) {
+        if (options.get(JDBC_URL).orElse(null) != null
+                && options.get(ZOOKEEPER_URL).orElse(null) != null) {
+            throw new RuntimeException("If " + JDBC_URL + " is specified, then 
 " + ZOOKEEPER_URL
+                    + " must not be specified");
+        }
+
+        String jdbcUrl = options.get(JDBC_URL).orElse(null);
+        String zkUrl = options.get(ZOOKEEPER_URL).orElse(null);
+        // Backward compatibility logic
+        if (jdbcUrl == null) {
+            if (zkUrl != null) {
+                if (zkUrl.startsWith(JDBC_PROTOCOL)) {
+                    // full URL specified, use it
+                    jdbcUrl = zkUrl;
+                } else {
+                    // backwards compatibility, assume ZK, and missing protocol
+                    // Don't use the specific protocol, as we need to work 
with older releases.
+                    jdbcUrl = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl;
+                }
+            } else {
+                jdbcUrl = JDBC_PROTOCOL;
+            }
+        }
+        return jdbcUrl;
+    }
+
     /**
      * Extract HBase and Phoenix properties that need to be set in both the 
driver and workers.
      * We expect these properties to be passed against the key
diff --git 
a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
 
b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
index 003a34d..3d14292 100644
--- 
a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
+++ 
b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
@@ -23,18 +23,18 @@ import java.util.Properties;
 class PhoenixDataSourceReadOptions implements Serializable {
 
     private final String tenantId;
-    private final String zkUrl;
+    private final String jdbcUrl;
     private final String scn;
     private final String selectStatement;
     private final Properties overriddenProps;
     private final byte[] pTableCacheBytes;
 
-    PhoenixDataSourceReadOptions(String zkUrl, String scn, String tenantId,
+    PhoenixDataSourceReadOptions(String jdbcUrl, String scn, String tenantId,
             String selectStatement, Properties overriddenProps, byte[] 
pTableCacheBytes) {
-        if(overriddenProps == null){
+        if (overriddenProps == null){
             throw new NullPointerException();
         }
-        this.zkUrl = zkUrl;
+        this.jdbcUrl = jdbcUrl;
         this.scn = scn;
         this.tenantId = tenantId;
         this.selectStatement = selectStatement;
@@ -50,8 +50,8 @@ class PhoenixDataSourceReadOptions implements Serializable {
         return scn;
     }
 
-    String getZkUrl() {
-        return zkUrl;
+    String getJdbcUrl() {
+        return jdbcUrl;
     }
 
     String getTenantId() {
diff --git 
a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
 
b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
index b839548..a7aca22 100644
--- 
a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
+++ 
b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
@@ -61,16 +61,12 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
 
-import static 
org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
-
 public class PhoenixDataSourceReader implements DataSourceReader, 
SupportsPushDownFilters,
         SupportsPushDownRequiredColumns {
 
     private final DataSourceOptions options;
     private final String tableName;
-    private final String zkUrl;
+    private final String jdbcUrl;
     private final boolean dateAsTimestamp;
     private final Properties overriddenProps;
 
@@ -83,14 +79,12 @@ public class PhoenixDataSourceReader implements 
DataSourceReader, SupportsPushDo
         if (!options.tableName().isPresent()) {
             throw new RuntimeException("No Phoenix option " + 
DataSourceOptions.TABLE_KEY + " defined");
         }
-        if (!options.get(PhoenixDataSource.ZOOKEEPER_URL).isPresent()) {
-            throw new RuntimeException("No Phoenix option " + 
PhoenixDataSource.ZOOKEEPER_URL + " defined");
-        }
+
         this.options = options;
         this.tableName = options.tableName().get();
-        this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL).get();
+        this.jdbcUrl = PhoenixDataSource.getJdbcUrlFromOptions(options);
         this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false);
-        this.overriddenProps = extractPhoenixHBaseConfFromOptions(options);
+        this.overriddenProps = 
PhoenixDataSource.extractPhoenixHBaseConfFromOptions(options);
         setSchema();
     }
 
@@ -98,8 +92,7 @@ public class PhoenixDataSourceReader implements 
DataSourceReader, SupportsPushDo
      * Sets the schema using all the table columns before any column pruning 
has been done
      */
     private void setSchema() {
-        try (Connection conn = DriverManager.getConnection(
-                JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, 
overriddenProps)) {
+        try (Connection conn = DriverManager.getConnection(jdbcUrl, 
overriddenProps)) {
             List<ColumnInfo> columnInfos = 
PhoenixRuntime.generateColumnInfo(conn, tableName, null);
             Seq<ColumnInfo> columnInfoSeq = 
JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq();
             schema = 
SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp);
@@ -134,14 +127,13 @@ public class PhoenixDataSourceReader implements 
DataSourceReader, SupportsPushDo
         // Generate splits based off statistics, or just region splits?
         boolean splitByStats = options.getBoolean(
                 PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, 
PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS);
-        if(currentScnValue.isPresent()) {
+        if (currentScnValue.isPresent()) {
             overriddenProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
currentScnValue.get());
         }
         if (tenantId.isPresent()){
             overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, 
tenantId.get());
         }
-        try (Connection conn = DriverManager.getConnection(
-                JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, 
overriddenProps)) {
+        try (Connection conn = DriverManager.getConnection(jdbcUrl, 
overriddenProps)) {
             List<ColumnInfo> columnInfos = 
PhoenixRuntime.generateColumnInfo(conn, tableName, new ArrayList<>(
                 Arrays.asList(schema.names())));
             final Statement statement = conn.createStatement();
@@ -188,7 +180,7 @@ public class PhoenixDataSourceReader implements 
DataSourceReader, SupportsPushDo
                 byte[] pTableCacheBytes = 
PTableImpl.toProto(queryPlan.getTableRef().getTable()).
                     toByteArray();
                 PhoenixDataSourceReadOptions phoenixDataSourceOptions =
-                     new PhoenixDataSourceReadOptions(zkUrl, 
currentScnValue.orElse(null),
+                     new PhoenixDataSourceReadOptions(jdbcUrl, 
currentScnValue.orElse(null),
                             tenantId.orElse(null), selectStatement, 
overriddenProps,
                          pTableCacheBytes);
                 if (splitByStats) {
diff --git 
a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
 
b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
index a58cfa0..a7f6240 100644
--- 
a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
+++ 
b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
@@ -62,9 +62,6 @@ import org.apache.spark.sql.types.StructType;
 
 import scala.collection.Iterator;
 
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
-
 public class PhoenixInputPartitionReader implements 
InputPartitionReader<InternalRow>  {
 
     private final SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
@@ -89,7 +86,7 @@ public class PhoenixInputPartitionReader implements 
InputPartitionReader<Interna
     private QueryPlan getQueryPlan() throws SQLException {
         String scn = options.getScn();
         String tenantId = options.getTenantId();
-        String zkUrl = options.getZkUrl();
+        String jdbcUrl = options.getJdbcUrl();
         Properties overridingProps = getOverriddenPropsFromOptions();
         if (scn != null) {
             overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
@@ -97,8 +94,7 @@ public class PhoenixInputPartitionReader implements 
InputPartitionReader<Interna
         if (tenantId != null) {
             overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
         }
-        try (Connection conn = DriverManager.getConnection(
-                JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, 
overridingProps)) {
+        try (Connection conn = DriverManager.getConnection(jdbcUrl, 
overridingProps)) {
             PTable pTable = null;
             try {
                 pTable = PTable.parseFrom(options.getPTableCacheBytes());
@@ -185,7 +181,7 @@ public class PhoenixInputPartitionReader implements 
InputPartitionReader<Interna
 
     @Override
     public void close() throws IOException {
-        if(resultSet != null) {
+        if (resultSet != null) {
             try {
                 resultSet.close();
             } catch (SQLException e) {
diff --git 
a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
 
b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
index cc7f459..7f1bb5f 100644
--- 
a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
+++ 
b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
@@ -25,30 +25,30 @@ import java.util.Properties;
 class PhoenixDataSourceWriteOptions implements Serializable {
 
     private final String tableName;
-    private final String zkUrl;
+    private final String jdbcUrl;
     private final String tenantId;
     private final String scn;
     private final StructType schema;
     private final boolean skipNormalizingIdentifier;
     private final Properties overriddenProps;
 
-    private PhoenixDataSourceWriteOptions(String tableName, String zkUrl, 
String scn,
+    private PhoenixDataSourceWriteOptions(String tableName, String jdbcUrl, 
String scn,
             String tenantId, StructType schema, boolean 
skipNormalizingIdentifier,
             Properties overriddenProps) {
         if (tableName == null) {
-            throw new NullPointerException();
+            throw new IllegalArgumentException("tableName must not be null");
         }
-        if (zkUrl == null) {
-            throw new NullPointerException();
+        if (jdbcUrl == null) {
+            throw new IllegalArgumentException("jdbcUrl must not be null");
         }
         if (schema == null) {
-            throw new NullPointerException();
+            throw new IllegalArgumentException("schema must not be null");
         }
         if (overriddenProps == null) {
-            throw new NullPointerException();
+            throw new IllegalArgumentException("overriddenProps must not be 
null");
         }
         this.tableName = tableName;
-        this.zkUrl = zkUrl;
+        this.jdbcUrl = jdbcUrl;
         this.scn = scn;
         this.tenantId = tenantId;
         this.schema = schema;
@@ -60,8 +60,8 @@ class PhoenixDataSourceWriteOptions implements Serializable {
         return scn;
     }
 
-    String getZkUrl() {
-        return zkUrl;
+    String getJdbcUrl() {
+        return jdbcUrl;
     }
 
     String getTenantId() {
@@ -86,7 +86,7 @@ class PhoenixDataSourceWriteOptions implements Serializable {
 
     static class Builder {
         private String tableName;
-        private String zkUrl;
+        private String jdbcUrl;
         private String scn;
         private String tenantId;
         private StructType schema;
@@ -98,8 +98,8 @@ class PhoenixDataSourceWriteOptions implements Serializable {
             return this;
         }
 
-        Builder setZkUrl(String zkUrl) {
-            this.zkUrl = zkUrl;
+        Builder setJdbcUrl(String jdbcUrl) {
+            this.jdbcUrl = jdbcUrl;
             return this;
         }
 
@@ -129,7 +129,7 @@ class PhoenixDataSourceWriteOptions implements Serializable 
{
         }
 
         PhoenixDataSourceWriteOptions build() {
-            return new PhoenixDataSourceWriteOptions(tableName, zkUrl, scn, 
tenantId, schema,
+            return new PhoenixDataSourceWriteOptions(tableName, jdbcUrl, scn, 
tenantId, schema,
                     skipNormalizingIdentifier, overriddenProps);
         }
     }
diff --git 
a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriter.java
 
b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriter.java
index 31a3065..08fe886 100644
--- 
a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriter.java
+++ 
b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.spark.datasource.v2.writer;
 
+import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -28,8 +29,6 @@ import org.apache.spark.sql.types.StructType;
 
 import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.CURRENT_SCN_VALUE;
 import static 
org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER;
-import static 
org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
-import static 
org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
 
 public class PhoenixDataSourceWriter implements DataSourceWriter {
 
@@ -42,9 +41,6 @@ public class PhoenixDataSourceWriter implements 
DataSourceWriter {
         if (!options.tableName().isPresent()) {
             throw new RuntimeException("No Phoenix option " + 
DataSourceOptions.TABLE_KEY + " defined");
         }
-        if (!options.get(ZOOKEEPER_URL).isPresent()) {
-            throw new RuntimeException("No Phoenix option " + ZOOKEEPER_URL + 
" defined");
-        }
         this.options = createPhoenixDataSourceWriteOptions(options, schema);
     }
 
@@ -74,16 +70,16 @@ public class PhoenixDataSourceWriter implements 
DataSourceWriter {
                                                                               
StructType schema) {
         String scn = options.get(CURRENT_SCN_VALUE).orElse(null);
         String tenantId = 
options.get(PhoenixRuntime.TENANT_ID_ATTRIB).orElse(null);
-        String zkUrl = options.get(ZOOKEEPER_URL).get();
+        String jdbcUrl = PhoenixDataSource.getJdbcUrlFromOptions(options);
         boolean skipNormalizingIdentifier = 
options.getBoolean(SKIP_NORMALIZING_IDENTIFIER, false);
         return new PhoenixDataSourceWriteOptions.Builder()
                 .setTableName(options.tableName().get())
-                .setZkUrl(zkUrl)
+                .setJdbcUrl(jdbcUrl)
                 .setScn(scn)
                 .setTenantId(tenantId)
                 .setSchema(schema)
                 .setSkipNormalizingIdentifier(skipNormalizingIdentifier)
-                
.setOverriddenProps(extractPhoenixHBaseConfFromOptions(options))
+                
.setOverriddenProps(PhoenixDataSource.extractPhoenixHBaseConfFromOptions(options))
                 .build();
     }
 }
diff --git 
a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
 
b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
index fd85d66..fabbe18 100644
--- 
a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
+++ 
b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
@@ -51,8 +51,6 @@ import org.apache.spark.sql.catalyst.expressions.Attribute;
 
 import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.DEFAULT_UPSERT_BATCH_SIZE;
 import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.UPSERT_BATCH_SIZE;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 
 public class PhoenixDataWriter implements DataWriter<InternalRow> {
 
@@ -67,7 +65,7 @@ public class PhoenixDataWriter implements 
DataWriter<InternalRow> {
     PhoenixDataWriter(PhoenixDataSourceWriteOptions options) {
         String scn = options.getScn();
         String tenantId = options.getTenantId();
-        String zkUrl = options.getZkUrl();
+        String jdbcUrl = options.getJdbcUrl();
         Properties overridingProps = options.getOverriddenProps();
         if (scn != null) {
             overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
@@ -84,8 +82,7 @@ public class PhoenixDataWriter implements 
DataWriter<InternalRow> {
         }
         encoder = RowEncoder$.MODULE$.apply(schema).resolveAndBind( 
scala.collection.JavaConverters.asScalaIteratorConverter(attrs.iterator()).asScala().toSeq(),
 SimpleAnalyzer$.MODULE$);
         try {
-            this.conn = DriverManager.getConnection(JDBC_PROTOCOL + 
JDBC_PROTOCOL_SEPARATOR + zkUrl,
-                    overridingProps);
+            this.conn = DriverManager.getConnection(jdbcUrl, overridingProps);
             List<String> colNames =  new 
ArrayList<>(Arrays.asList(options.getSchema().names()));
             if (!options.skipNormalizingIdentifier()){
                 colNames = 
colNames.stream().map(SchemaUtil::normalizeIdentifier).collect(Collectors.toList());
diff --git 
a/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java 
b/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java
index dbcf45f..774454f 100644
--- 
a/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java
+++ 
b/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java
@@ -17,7 +17,11 @@
  */
 package org.apache.phoenix.spark;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.spark.sql.connector.PhoenixDataSource;
+import org.apache.phoenix.util.InstanceResolver;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.*;
@@ -25,13 +29,17 @@ import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
+import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
 import java.sql.*;
 import java.util.Arrays;
 
+import static 
org.apache.phoenix.spark.sql.connector.PhoenixDataSource.JDBC_URL;
 import static 
org.apache.phoenix.spark.sql.connector.PhoenixDataSource.ZOOKEEPER_URL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 import static org.junit.Assert.*;
 
 public class DataSourceApiIT extends ParallelStatsDisabledIT {
@@ -40,46 +48,114 @@ public class DataSourceApiIT extends 
ParallelStatsDisabledIT {
         super();
     }
 
+    @BeforeClass
+    public static synchronized void setupInstanceResolver() {
+        //FIXME This should be called whenever we set up a miniCluster, deep 
in BaseTest
+        InstanceResolver.clearSingletons();
+        // Make sure the ConnectionInfo in the tool doesn't try to pull a 
default Configuration
+        InstanceResolver.getSingleton(ConfigurationFactory.class, new 
ConfigurationFactory() {
+            @Override
+            public Configuration getConfiguration() {
+                return new Configuration(config);
+            }
+
+            @Override
+            public Configuration getConfiguration(Configuration confToClone) {
+                Configuration copy = new Configuration(config);
+                copy.addResource(confToClone);
+                return copy;
+            }
+        });
+    }
+
     @Test
-    public void basicWriteTest() throws SQLException {
+    public void basicWriteAndReadBackTest() throws SQLException {
         SparkConf sparkConf = new 
SparkConf().setMaster("local").setAppName("phoenix-test");
         JavaSparkContext jsc = new JavaSparkContext(sparkConf);
         SQLContext sqlContext = new SQLContext(jsc);
         String tableName = generateUniqueName();
 
         try (Connection conn = DriverManager.getConnection(getUrl());
-             Statement stmt = conn.createStatement()){
-            stmt.executeUpdate("CREATE TABLE " + tableName + " (id INTEGER 
PRIMARY KEY, v1 VARCHAR)");
+                Statement stmt = conn.createStatement()) {
+            stmt.executeUpdate(
+                "CREATE TABLE " + tableName + " (id INTEGER PRIMARY KEY, v1 
VARCHAR)");
         }
 
-        try(SparkSession spark = sqlContext.sparkSession()) {
-
-            StructType schema = new StructType(new StructField[]{
-                    new StructField("id", DataTypes.IntegerType, false, 
Metadata.empty()),
-                    new StructField("v1", DataTypes.StringType, false, 
Metadata.empty())
-            });
-
-            Dataset<Row> df = spark.createDataFrame(
-                    Arrays.asList(
-                            RowFactory.create(1, "x")),
-                    schema);
-
-            df.write()
-                    .format("phoenix")
-                    .mode(SaveMode.Append)
-                    .option("table", tableName)
-                    .option(ZOOKEEPER_URL, getUrl())
-                    .save();
+        try (SparkSession spark = sqlContext.sparkSession()) {
+
+            StructType schema =
+                    new StructType(new StructField[] {
+                            new StructField("id", DataTypes.IntegerType, 
false, Metadata.empty()),
+                            new StructField("v1", DataTypes.StringType, false, 
Metadata.empty()) });
+
+            // Use old zkUrl
+            Dataset<Row> df1 =
+                    spark.createDataFrame(
+                        Arrays.asList(RowFactory.create(1, "x")),
+                        schema);
+
+            df1.write().format("phoenix").mode(SaveMode.Append)
+            .option("table", tableName)
+            .option(ZOOKEEPER_URL, getUrl())
+            .save();
+
+            // Use jdbcUrl
+            Dataset<Row> df2 =
+                    spark.createDataFrame(
+                        Arrays.asList(RowFactory.create(2, "x")),
+                        schema);
+
+            df2.write().format("phoenix").mode(SaveMode.Append)
+                .option("table", tableName)
+                .option(JDBC_URL, JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + 
getUrl())
+                .save();
+
+            // Use default from hbase-site.xml
+            Dataset<Row> df3 =
+                    spark.createDataFrame(
+                        Arrays.asList(RowFactory.create(3, "x")),
+                        schema);
+
+            df3.write().format("phoenix").mode(SaveMode.Append)
+                .option("table", tableName)
+                .save();
 
             try (Connection conn = DriverManager.getConnection(getUrl());
-                 Statement stmt = conn.createStatement()) {
+                    Statement stmt = conn.createStatement()) {
                 ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
                 assertTrue(rs.next());
                 assertEquals(1, rs.getInt(1));
                 assertEquals("x", rs.getString(2));
+                assertTrue(rs.next());
+                assertEquals(2, rs.getInt(1));
+                assertEquals("x", rs.getString(2));
+                assertTrue(rs.next());
+                assertEquals(3, rs.getInt(1));
+                assertEquals("x", rs.getString(2));
                 assertFalse(rs.next());
             }
 
+            Dataset df1Read = spark.read().format("phoenix")
+                    .option("table", tableName)
+                    .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+
+            assertEquals(3l, df1Read.count());
+
+            // Use jdbcUrl
+            Dataset df2Read = spark.read().format("phoenix")
+                    .option("table", tableName)
+                    .option(PhoenixDataSource.JDBC_URL,
+                                JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + 
getUrl())
+                    .load();
+
+            assertEquals(3l, df2Read.count());
+
+            // Use default
+            Dataset df3Read = spark.read().format("phoenix")
+                    .option("table", tableName)
+                    .load();
+
+            assertEquals(3l, df3Read.count());
 
         } finally {
             jsc.stop();
diff --git 
a/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/sql/connector/PhoenixTestingDataSource.java
 
b/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/sql/connector/PhoenixTestingDataSource.java
index cea6678..4860154 100644
--- 
a/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/sql/connector/PhoenixTestingDataSource.java
+++ 
b/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/sql/connector/PhoenixTestingDataSource.java
@@ -34,9 +34,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
-
 public class PhoenixTestingDataSource extends PhoenixDataSource {
 
     public static final String TEST_SOURCE =
@@ -45,11 +42,10 @@ public class PhoenixTestingDataSource extends 
PhoenixDataSource {
     @Override
     public StructType inferSchema(CaseInsensitiveStringMap options) {
         String tableName = options.get("table");
-        String zkUrl = options.get(ZOOKEEPER_URL);
+        String jdbcUrl = getJdbcUrlFromOptions(options);
         boolean dateAsTimestamp = 
Boolean.parseBoolean(options.getOrDefault("dateAsTimestamp", 
Boolean.toString(false)));
         Properties overriddenProps = 
extractPhoenixHBaseConfFromOptions(options);
-        try (Connection conn = DriverManager.getConnection(
-                JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, 
overriddenProps)) {
+        try (Connection conn = DriverManager.getConnection(jdbcUrl, 
overriddenProps)) {
             List<ColumnInfo> columnInfos = 
PhoenixRuntime.generateColumnInfo(conn, tableName, null);
             Seq<ColumnInfo> columnInfoSeq = 
JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq();
             schema = 
SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp);
diff --git a/phoenix5-spark3/README.md b/phoenix5-spark3/README.md
index 40e769b..824886d 100644
--- a/phoenix5-spark3/README.md
+++ b/phoenix5-spark3/README.md
@@ -54,7 +54,7 @@ val spark = SparkSession
 val df = spark.sqlContext
   .read
   .format("phoenix")
-  .options(Map("table" -> "TABLE1", "zkUrl" -> "phoenix-server:2181"))
+  .options(Map("table" -> "TABLE1"))
   .load
 
 df.filter(df("COL1") === "test_row_1" && df("ID") === 1L)
@@ -81,7 +81,6 @@ public class PhoenixSparkRead {
             .read()
             .format("phoenix")
             .option("table", "TABLE1")
-            .option("zkUrl", "phoenix-server:2181")
             .load();
         df.createOrReplaceTempView("TABLE1");
     
@@ -126,14 +125,14 @@ val spark = SparkSession
 val df = spark.sqlContext
   .read
   .format("phoenix")
-  .options(Map("table" -> "INPUT_TABLE", "zkUrl" -> "phoenix-server:2181"))
+  .options(Map("table" -> "INPUT_TABLE"))
   .load
 
 // Save to OUTPUT_TABLE
 df.write
   .format("phoenix")
   .mode(SaveMode.Append)
-  .options(Map("table" -> "OUTPUT_TABLE", "zkUrl" -> "phoenix-server:2181"))
+  .options(Map("table" -> "OUTPUT_TABLE"))
   .save()
 ```
 Java example:
@@ -157,7 +156,6 @@ public class PhoenixSparkWriteFromInputTable {
             .read()
             .format("phoenix")
             .option("table", "INPUT_TABLE")
-            .option("zkUrl", "phoenix-server:2181")
             .load();
         
         // Save to OUTPUT_TABLE
@@ -165,7 +163,6 @@ public class PhoenixSparkWriteFromInputTable {
           .format("phoenix")
           .mode(SaveMode.Append)
           .option("table", "OUTPUT_TABLE")
-          .option("zkUrl", "phoenix-server:2181")
           .save();
         jsc.stop();
     }
@@ -212,7 +209,7 @@ val df = spark.sqlContext.createDataFrame(rowRDD, schema)
 
 df.write
   .format("phoenix")
-  .options(Map("table" -> "OUTPUT_TABLE", "zkUrl" -> "phoenix-server:2181"))
+  .options(Map("table" -> "OUTPUT_TABLE"))
   .mode(SaveMode.Append)
   .save()
 ```
@@ -261,7 +258,6 @@ public class PhoenixSparkWriteFromRDDWithSchema {
             .format("phoenix")
             .mode(SaveMode.Append)
             .option("table", "OUTPUT_TABLE")
-            .option("zkUrl",  "phoenix-server:2181")
             .save();
   
         jsc.stop();
@@ -271,13 +267,21 @@ public class PhoenixSparkWriteFromRDDWithSchema {
 
 ## Notes
 
-- If you want to use DataSourceV1, you can use source type 
`"org.apache.phoenix.spark"` 
-  instead of `"phoenix"`, however this is deprecated as of `connectors-1.0.0`.
-- The (deprecated) functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` 
and `saveToPhoenix` all support
+- The DataSourceV2 based "phoenix" data source accepts the `"jdbcUrl"` 
parameter, which can be
+used to override the default Hbase/Phoenix instance specified in 
hbase-site.xml. It also accepts
+the deprected `zkUrl` parameter for backwards compatibility purposes. If 
neither is specified,
+it falls back to using connection defined by hbase-site.xml.
+- `"jdbcUrl"` expects a full Phoenix JDBC URL, i.e. "jdbc:phoenix" or 
"jdbc:phoenix:zkHost:zkport",
+while `"zkUrl"` expects the ZK quorum only, i.e. "zkHost:zkPort"
+- If you want to use DataSourceV1, you can use source type 
`"org.apache.phoenix.spark"`
+instead of `"phoenix"`, however this is deprecated as of `connectors-1.0.0`.
+The `"org.apache.phoenix.spark"` datasource does not accept the `"jdbcUrl"` 
parameter,
+only `"zkUrl"`
+- The (deprecated) functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and
+`saveToPhoenix` use the deprecated `"org.apache.phoenix.spark"` datasource, 
and allow
 optionally specifying a `conf` Hadoop configuration parameter with custom 
Phoenix client settings,
-as well as an optional `zkUrl` parameter for the Phoenix connection URL.
-- If `zkUrl` isn't specified, it's assumed that the "hbase.zookeeper.quorum" 
property has been set
-in the `conf` parameter. Similarly, if no configuration is passed in, `zkUrl` 
must be specified.
+as well as an optional `zkUrl` parameter.
+
 - As of [PHOENIX-5197](https://issues.apache.org/jira/browse/PHOENIX-5197), 
you can pass configurations from the driver
 to executors as a comma-separated list against the key `phoenixConfigs` i.e 
(PhoenixDataSource.PHOENIX_CONFIGS), for ex:
     ```scala
@@ -285,7 +289,7 @@ to executors as a comma-separated list against the key 
`phoenixConfigs` i.e (Pho
       .sqlContext
       .read
       .format("phoenix")
-      .options(Map("table" -> "Table1", "zkUrl" -> "phoenix-server:2181", 
"phoenixConfigs" -> "hbase.client.retries.number=10,hbase.client.pause=10000"))
+      .options(Map("table" -> "Table1", "jdbcUrl" -> 
"jdbc:phoenix:phoenix-server:2181", "phoenixConfigs" -> 
"hbase.client.retries.number=10,hbase.client.pause=10000"))
       .load;
     ```
     This list of properties is parsed and populated into a properties map 
which is passed to `DriverManager.getConnection(connString, propsMap)`.
@@ -299,12 +303,6 @@ to executors as a comma-separated list against the key 
`phoenixConfigs` i.e (Pho
 create the DataFrame or RDD directly if you need fine-grained configuration.
 - No support for aggregate or distinct functions 
(http://phoenix.apache.org/phoenix_mr.html)
 
-## Limitations of the Spark3 connector comapred to the Spark2 Connector
-
-- Non-uppercase column names cannot be used for mapping DataFrames. 
(PHOENIX-6668)
-- When writing to a DataFrame, every SQL column in the table must be 
specified. (PHOENIX-6667)
-
-
 ## Deprecated Usages
 
 ### Load as a DataFrame directly using a Configuration object
diff --git 
a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSource.java
 
b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSource.java
index 4fc5ad4..c3c0ade 100644
--- 
a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSource.java
+++ 
b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSource.java
@@ -48,23 +48,20 @@ public class PhoenixDataSource implements TableProvider, 
DataSourceRegister {
 
     private static final Logger logger = 
LoggerFactory.getLogger(PhoenixDataSource.class);
     public static final String SKIP_NORMALIZING_IDENTIFIER = 
"skipNormalizingIdentifier";
+    @Deprecated
     public static final String ZOOKEEPER_URL = "zkUrl";
+    public static final String JDBC_URL = "jdbcUrl";
     public static final String PHOENIX_CONFIGS = "phoenixconfigs";
     protected StructType schema;
-    private CaseInsensitiveStringMap options;
 
     @Override
     public StructType inferSchema(CaseInsensitiveStringMap options){
         if (options.get("table") == null) {
             throw new RuntimeException("No Phoenix option " + "Table" + " 
defined");
         }
-        if (options.get(ZOOKEEPER_URL) == null) {
-            throw new RuntimeException("No Phoenix option " + ZOOKEEPER_URL + 
" defined");
-        }
 
-        this.options = options;
+        String jdbcUrl = getJdbcUrlFromOptions(options);
         String tableName = options.get("table");
-        String zkUrl = options.get(ZOOKEEPER_URL);
         String tenant = options.get(PhoenixRuntime.TENANT_ID_ATTRIB);
         boolean dateAsTimestamp = 
Boolean.parseBoolean(options.getOrDefault("dateAsTimestamp", 
Boolean.toString(false)));
         Properties overriddenProps = 
extractPhoenixHBaseConfFromOptions(options);
@@ -75,8 +72,7 @@ public class PhoenixDataSource implements TableProvider, 
DataSourceRegister {
         /**
          * Sets the schema using all the table columns before any column 
pruning has been done
          */
-        try (Connection conn = DriverManager.getConnection(
-                JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, 
overriddenProps)) {
+        try (Connection conn = DriverManager.getConnection(jdbcUrl, 
overriddenProps)) {
             List<ColumnInfo> columnInfos = 
PhoenixRuntime.generateColumnInfo(conn, tableName, null);
             Seq<ColumnInfo> columnInfoSeq = 
JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq();
             schema = 
SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp);
@@ -87,6 +83,32 @@ public class PhoenixDataSource implements TableProvider, 
DataSourceRegister {
         return schema;
     }
 
+    public static String getJdbcUrlFromOptions(Map<String, String> options) {
+        if (options.get(JDBC_URL) != null && options.get(ZOOKEEPER_URL) != 
null) {
+            throw new RuntimeException("If " + JDBC_URL + " is specified, then 
 " + ZOOKEEPER_URL
+                    + " must not be specified");
+        }
+
+        String jdbcUrl = options.get(JDBC_URL);
+        String zkUrl = options.get(ZOOKEEPER_URL);
+        // Backward compatibility logic
+        if (jdbcUrl == null) {
+            if (zkUrl != null) {
+                if (zkUrl.startsWith(JDBC_PROTOCOL)) {
+                    // full URL specified, use it
+                    jdbcUrl = zkUrl;
+                } else {
+                    // backwards compatibility, assume ZK, and missing protocol
+                    // Don't use the specific protocol, as we need to work 
with older releases.
+                    jdbcUrl = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl;
+                }
+            } else {
+                jdbcUrl = JDBC_PROTOCOL;
+            }
+        }
+        return jdbcUrl;
+    }
+
     @Override
     public Table getTable( StructType schema, Transform[] transforms, 
Map<String, String> properties)
     {
diff --git 
a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixDataSourceReadOptions.java
 
b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixDataSourceReadOptions.java
index 4d49150..050663d 100644
--- 
a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixDataSourceReadOptions.java
+++ 
b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixDataSourceReadOptions.java
@@ -25,19 +25,19 @@ import java.util.Properties;
 class PhoenixDataSourceReadOptions implements Serializable {
 
     private final String tenantId;
-    private final String zkUrl;
+    private final String jdbcUrl;
     private final String scn;
     private final String selectStatement;
     private final Properties overriddenProps;
     private final byte[] pTableCacheBytes;
 
-    PhoenixDataSourceReadOptions(String zkUrl, String scn, String tenantId,
+    PhoenixDataSourceReadOptions(String jdbcUrl, String scn, String tenantId,
                                  String selectStatement, Properties 
overriddenProps,
         byte[] pTableCacheBytes) {
-        if(overriddenProps == null){
+        if (overriddenProps == null){
             throw new NullPointerException();
         }
-        this.zkUrl = zkUrl;
+        this.jdbcUrl = jdbcUrl;
         this.scn = scn;
         this.tenantId = tenantId;
         this.selectStatement = selectStatement;
@@ -53,8 +53,8 @@ class PhoenixDataSourceReadOptions implements Serializable {
         return scn;
     }
 
-    String getZkUrl() {
-        return zkUrl;
+    String getJdbcUrl() {
+        return jdbcUrl;
     }
 
     String getTenantId() {
diff --git 
a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixPartitionReader.java
 
b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixPartitionReader.java
index 8a34cae..6a7ee4d 100644
--- 
a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixPartitionReader.java
+++ 
b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixPartitionReader.java
@@ -57,9 +57,6 @@ import 
org.apache.spark.sql.execution.datasources.SparkJdbcUtil;
 import org.apache.spark.sql.types.StructType;
 import scala.collection.Iterator;
 
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
-
 public class PhoenixPartitionReader implements PartitionReader<InternalRow> {
 
     private final PhoenixInputPartition inputPartition;
@@ -81,11 +78,10 @@ public class PhoenixPartitionReader implements 
PartitionReader<InternalRow> {
     }
 
     private QueryPlan getQueryPlan() throws SQLException {
-        String zkUrl = options.getZkUrl();
+        String jdbcUrl = options.getJdbcUrl();
         Properties overridingProps = getOverriddenPropsFromOptions();
         overridingProps.put("phoenix.skip.system.tables.existence.check", 
Boolean.valueOf("true"));
-        try (Connection conn = DriverManager.getConnection(
-                JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, 
overridingProps)) {
+        try (Connection conn = DriverManager.getConnection(jdbcUrl, 
overridingProps)) {
             PTable pTable = null;
             try {
                 pTable = PTable.parseFrom(options.getPTableCacheBytes());
@@ -172,7 +168,7 @@ public class PhoenixPartitionReader implements 
PartitionReader<InternalRow> {
 
     @Override
     public void close() throws IOException {
-        if(resultSet != null) {
+        if (resultSet != null) {
             try {
                 resultSet.close();
             } catch (SQLException e) {
diff --git 
a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixScan.java
 
b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixScan.java
index 4e6af60..c72206e 100644
--- 
a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixScan.java
+++ 
b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixScan.java
@@ -51,14 +51,12 @@ import java.util.List;
 import java.util.Properties;
 
 import static 
org.apache.phoenix.spark.sql.connector.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 
 public class PhoenixScan implements Scan, Batch {
 
     private final StructType schema;
     private final CaseInsensitiveStringMap options;
-    private final String zkUrl;
+    private final String jdbcUrl;
     private final Properties overriddenProps;
     private PhoenixDataSourceReadOptions phoenixDataSourceOptions;
     private final String tableName;
@@ -72,8 +70,8 @@ public class PhoenixScan implements Scan, Batch {
         this.options = options;
         this.whereClause = whereClause;
         this.overriddenProps = extractPhoenixHBaseConfFromOptions(options);
-        this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL);
-        tableName = options.get("table");
+        this.jdbcUrl = PhoenixDataSource.getJdbcUrlFromOptions(options);
+        this.tableName = options.get("table");
     }
 
     private void populateOverriddenProperties(){
@@ -82,7 +80,7 @@ public class PhoenixScan implements Scan, Batch {
         // Generate splits based off statistics, or just region splits?
         splitByStats = options.getBoolean(
                 PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, 
PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS);
-        if(currentScnValue != null) {
+        if (currentScnValue != null) {
             overriddenProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
currentScnValue);
         }
         if (tenantId != null){
@@ -109,7 +107,7 @@ public class PhoenixScan implements Scan, Batch {
     public InputPartition[] planInputPartitions() {
         populateOverriddenProperties();
         try (Connection conn = DriverManager.getConnection(
-                JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, 
overriddenProps)) {
+                jdbcUrl, overriddenProps)) {
             List<ColumnInfo> columnInfos = 
PhoenixRuntime.generateColumnInfo(conn, tableName, new ArrayList<>(
                     Arrays.asList(schema.names())));
             final Statement statement = conn.createStatement();
@@ -151,7 +149,7 @@ public class PhoenixScan implements Scan, Batch {
                 byte[] pTableCacheBytes = 
PTableImpl.toProto(queryPlan.getTableRef().getTable()).
                     toByteArray();
                 phoenixDataSourceOptions =
-                        new PhoenixDataSourceReadOptions(zkUrl, 
currentScnValue,
+                        new PhoenixDataSourceReadOptions(jdbcUrl, 
currentScnValue,
                                 tenantId, selectStatement, overriddenProps, 
pTableCacheBytes);
 
                 if (splitByStats) {
diff --git 
a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/writer/PhoenixBatchWrite.java
 
b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/writer/PhoenixBatchWrite.java
index b666bf1..5115f73 100644
--- 
a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/writer/PhoenixBatchWrite.java
+++ 
b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/writer/PhoenixBatchWrite.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.spark.sql.connector.writer;
 
+import org.apache.phoenix.spark.sql.connector.PhoenixDataSource;
 import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.spark.sql.connector.write.BatchWrite;
@@ -29,7 +30,6 @@ import org.apache.spark.sql.types.StructType;
 import java.util.Map;
 
 import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.CURRENT_SCN_VALUE;
-import static 
org.apache.phoenix.spark.sql.connector.PhoenixDataSource.ZOOKEEPER_URL;
 import static 
org.apache.phoenix.spark.sql.connector.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER;
 import static 
org.apache.phoenix.spark.sql.connector.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
 
@@ -60,12 +60,12 @@ public class PhoenixBatchWrite implements BatchWrite {
                                                                                
          StructType schema) {
         String scn = options.get(CURRENT_SCN_VALUE);
         String tenantId = options.get(PhoenixRuntime.TENANT_ID_ATTRIB);
-        String zkUrl = options.get(ZOOKEEPER_URL);
+        String jdbcUrl = PhoenixDataSource.getJdbcUrlFromOptions(options);
         String tableName = options.get("table");
         boolean skipNormalizingIdentifier = 
Boolean.parseBoolean(options.getOrDefault(SKIP_NORMALIZING_IDENTIFIER, 
Boolean.toString(false)));
         return new PhoenixDataSourceWriteOptions.Builder()
                 .setTableName(tableName)
-                .setZkUrl(zkUrl)
+                .setJdbcUrl(jdbcUrl)
                 .setScn(scn)
                 .setTenantId(tenantId)
                 .setSchema(schema)
diff --git 
a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/writer/PhoenixDataSourceWriteOptions.java
 
b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/writer/PhoenixDataSourceWriteOptions.java
index 03baae1..adc8f37 100644
--- 
a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/writer/PhoenixDataSourceWriteOptions.java
+++ 
b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/writer/PhoenixDataSourceWriteOptions.java
@@ -28,30 +28,30 @@ import java.util.Properties;
 class PhoenixDataSourceWriteOptions implements Serializable {
 
     private final String tableName;
-    private final String zkUrl;
+    private final String jdbcUrl;
     private final String tenantId;
     private final String scn;
     private final StructType schema;
     private final boolean skipNormalizingIdentifier;
     private final Properties overriddenProps;
 
-    private PhoenixDataSourceWriteOptions(String tableName, String zkUrl, 
String scn,
+    private PhoenixDataSourceWriteOptions(String tableName, String jdbcUrl, 
String scn,
                                           String tenantId, StructType schema, 
boolean skipNormalizingIdentifier,
                                           Properties overriddenProps) {
         if (tableName == null) {
-            throw new NullPointerException();
+            throw new IllegalArgumentException("tableName must not be null");
         }
-        if (zkUrl == null) {
-            throw new NullPointerException();
+        if (jdbcUrl == null) {
+            throw new IllegalArgumentException("jdbcUrl must not be null");
         }
         if (schema == null) {
-            throw new NullPointerException();
+            throw new IllegalArgumentException("schema must not be null");
         }
         if (overriddenProps == null) {
-            throw new NullPointerException();
+            throw new IllegalArgumentException("overriddenProps must not be 
null");
         }
         this.tableName = tableName;
-        this.zkUrl = zkUrl;
+        this.jdbcUrl = jdbcUrl;
         this.scn = scn;
         this.tenantId = tenantId;
         this.schema = schema;
@@ -63,8 +63,8 @@ class PhoenixDataSourceWriteOptions implements Serializable {
         return scn;
     }
 
-    String getZkUrl() {
-        return zkUrl;
+    String getJdbcUrl() {
+        return jdbcUrl;
     }
 
     String getTenantId() {
@@ -97,7 +97,7 @@ class PhoenixDataSourceWriteOptions implements Serializable {
 
     static class Builder {
         private String tableName;
-        private String zkUrl;
+        private String jdbcUrl;
         private String scn;
         private String tenantId;
         private StructType schema;
@@ -109,8 +109,8 @@ class PhoenixDataSourceWriteOptions implements Serializable 
{
             return this;
         }
 
-        Builder setZkUrl(String zkUrl) {
-            this.zkUrl = zkUrl;
+        Builder setJdbcUrl(String jdbcUrl) {
+            this.jdbcUrl = jdbcUrl;
             return this;
         }
 
@@ -140,7 +140,7 @@ class PhoenixDataSourceWriteOptions implements Serializable 
{
         }
 
         PhoenixDataSourceWriteOptions build() {
-            return new PhoenixDataSourceWriteOptions(tableName, zkUrl, scn, 
tenantId, schema,
+            return new PhoenixDataSourceWriteOptions(tableName, jdbcUrl, scn, 
tenantId, schema,
                     skipNormalizingIdentifier, overriddenProps);
         }
     }
diff --git 
a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/writer/PhoenixDataWriter.java
 
b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/writer/PhoenixDataWriter.java
index 33fe0f4..ea5338f 100644
--- 
a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/writer/PhoenixDataWriter.java
+++ 
b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/writer/PhoenixDataWriter.java
@@ -50,8 +50,6 @@ import org.apache.spark.sql.catalyst.expressions.Attribute;
 
 import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.DEFAULT_UPSERT_BATCH_SIZE;
 import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.UPSERT_BATCH_SIZE;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 
 public class PhoenixDataWriter implements DataWriter<InternalRow> {
 
@@ -64,7 +62,7 @@ public class PhoenixDataWriter implements 
DataWriter<InternalRow> {
     private ExpressionEncoder<Row> encoder;
 
     PhoenixDataWriter(StructType schema, PhoenixDataSourceWriteOptions 
options) {
-        String zkUrl = options.getZkUrl();
+        String jdbcUrl = options.getJdbcUrl();
         Properties connectionProps = options.getEffectiveProps();
         this.schema = options.getSchema();
 
@@ -74,8 +72,7 @@ public class PhoenixDataWriter implements 
DataWriter<InternalRow> {
         }
         encoder = RowEncoder$.MODULE$.apply(schema).resolveAndBind( 
scala.collection.JavaConverters.asScalaIteratorConverter(attrs.iterator()).asScala().toSeq(),
 SimpleAnalyzer$.MODULE$);
         try {
-            this.conn = DriverManager.getConnection(JDBC_PROTOCOL + 
JDBC_PROTOCOL_SEPARATOR + zkUrl,
-                    connectionProps);
+            this.conn = DriverManager.getConnection(jdbcUrl, connectionProps);
             List<String> colNames =  new 
ArrayList<>(Arrays.asList(options.getSchema().names()));
             if (!options.skipNormalizingIdentifier()){
                 colNames = 
colNames.stream().map(SchemaUtil::normalizeIdentifier).collect(Collectors.toList());
diff --git 
a/phoenix5-spark3/src/test/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSourceTest.java
 
b/phoenix5-spark3/src/test/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSourceTest.java
index 9a06236..ae7e2bd 100644
--- 
a/phoenix5-spark3/src/test/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSourceTest.java
+++ 
b/phoenix5-spark3/src/test/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSourceTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.spark.sql.connector;
 
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import org.junit.Test;
 
@@ -82,4 +83,30 @@ public class PhoenixDataSourceTest {
         assertTrue(extractPhoenixHBaseConfFromOptions(null).isEmpty());
     }
 
+    @Test
+    public void testUrlFallbackLogic() {
+        Map<String, String> props = new HashMap<>();
+
+        assertEquals(PhoenixRuntime.JDBC_PROTOCOL, 
PhoenixDataSource.getJdbcUrlFromOptions(props));
+
+        // The fallback logic doesn't attempt to check the URL, except for the 
presence of
+        // the "jdbc:phoenix" prefix
+        String NOTANURL = "notanurl";
+
+        props.put(PhoenixDataSource.JDBC_URL, NOTANURL);
+        assertEquals(NOTANURL, PhoenixDataSource.getJdbcUrlFromOptions(props));
+
+        props.remove(PhoenixDataSource.JDBC_URL);
+        props.put(PhoenixDataSource.ZOOKEEPER_URL, NOTANURL);
+        assertEquals(
+            PhoenixRuntime.JDBC_PROTOCOL + 
PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + NOTANURL,
+            PhoenixDataSource.getJdbcUrlFromOptions(props));
+
+        props.put(PhoenixDataSource.ZOOKEEPER_URL,
+            PhoenixRuntime.JDBC_PROTOCOL + 
PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + NOTANURL);
+        assertEquals(
+            PhoenixRuntime.JDBC_PROTOCOL + 
PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + NOTANURL,
+            PhoenixDataSource.getJdbcUrlFromOptions(props));
+    }
+
 }
diff --git a/pom.xml b/pom.xml
index 1f28456..b4c4cb8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,37 +83,16 @@
 
     <!-- Profiling is not enabled in the repo. Placeholder. -->
     <jacocoArgLine></jacocoArgLine>
-    <phoenix-surefire.argLine>-enableassertions -Xmx${surefire.Xmx}
-      -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true
-      -Djava.awt.headless=true 
-Djdk.net.URLClassPath.disableClassPathURLCheck=true
-      -Dorg.apache.hbase.thirdparty.io.netty.leakDetection.level=advanced
-      -Dio.netty.eventLoopThreads=3 -Duser.timezone="America/Los_Angeles"
-      -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=./target/
-      
"-Djava.library.path=${hadoop.library.path}${path.separator}${java.library.path}"
-    </phoenix-surefire.argLine>
-    <phoenix-surefire.jdk8.tuning.flags>
-        -XX:NewRatio=4 -XX:SurvivorRatio=8 -XX:+UseCompressedOops 
-XX:+UseConcMarkSweepGC
-        -XX:+DisableExplicitGC -XX:+UseCMSInitiatingOccupancyOnly 
-XX:+CMSClassUnloadingEnabled
-        -XX:+CMSScavengeBeforeRemark -XX:CMSInitiatingOccupancyFraction=68
-    </phoenix-surefire.jdk8.tuning.flags>
-    
<phoenix-surefire.jdk11.flags>-Dorg.apache.hbase.thirdparty.io.netty.tryReflectionSetAccessible=true
-      --add-modules jdk.unsupported
-      --add-opens java.base/java.nio=ALL-UNNAMED
-      --add-opens java.base/sun.nio.ch=ALL-UNNAMED
-      --add-opens java.base/java.lang=ALL-UNNAMED
-      --add-opens java.base/jdk.internal.ref=ALL-UNNAMED
-      --add-opens java.base/java.lang.reflect=ALL-UNNAMED
-      --add-opens java.base/java.util=ALL-UNNAMED
-      --add-opens java.base/java.util.concurrent=ALL-UNNAMED
-      --add-exports java.base/jdk.internal.misc=ALL-UNNAMED
-      --add-exports 
java.security.jgss/sun.security.krb5=ALL-UNNAMED</phoenix-surefire.jdk11.flags>
-    <phoenix-surefire.jdk11.tuning.flags>
-        ${phoenix-surefire.jdk8.tuning.flags}
-    </phoenix-surefire.jdk11.tuning.flags>
+    <!-- Hard to read, but ScalaTest cannot handle multiline argLine. 
+         It cannot resolve @{} either-->
+    <phoenix-surefire.argLine>-enableassertions -Xmx${surefire.Xmx} 
-Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true 
-Djava.awt.headless=true -Djdk.net.URLClassPath.disableClassPathURLCheck=true 
-Dorg.apache.hbase.thirdparty.io.netty.leakDetection.level=advanced 
-Dio.netty.eventLoopThreads=3 -Duser.timezone="America/Los_Angeles" 
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=./target/ 
"-Djava.library.path=${hadoop.library.path}${path.separator}${java.library.path 
[...]
+    <phoenix-surefire.jdk8.tuning.flags>-XX:NewRatio=4 -XX:SurvivorRatio=8 
-XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+DisableExplicitGC 
-XX:+UseCMSInitiatingOccupancyOnly -XX:+CMSClassUnloadingEnabled 
-XX:+CMSScavengeBeforeRemark 
-XX:CMSInitiatingOccupancyFraction=68</phoenix-surefire.jdk8.tuning.flags>
+    
<phoenix-surefire.jdk11.flags>-Dorg.apache.hbase.thirdparty.io.netty.tryReflectionSetAccessible=true
 --add-modules jdk.unsupported --add-opens java.base/java.nio=ALL-UNNAMED 
--add-opens java.base/sun.nio.ch=ALL-UNNAMED --add-opens 
java.base/java.lang=ALL-UNNAMED --add-opens 
java.base/jdk.internal.ref=ALL-UNNAMED --add-opens 
java.base/java.lang.reflect=ALL-UNNAMED --add-opens 
java.base/java.util=ALL-UNNAMED --add-opens 
java.base/java.util.concurrent=ALL-UNNAMED --add-exports java.base [...]
+    
<phoenix-surefire.jdk11.tuning.flags>${phoenix-surefire.jdk8.tuning.flags}</phoenix-surefire.jdk11.tuning.flags>
     <phoenix-surefire.jdk17.flags>--add-opens 
java.base/jdk.internal.util.random=ALL-UNNAMED</phoenix-surefire.jdk17.flags>
     <phoenix-surefire.jdk17.tuning.flags></phoenix-surefire.jdk17.tuning.flags>
     <!-- Surefire argLine defaults for Linux + JDK8 -->
-    <argLine>${phoenix-surefire.argLine} ${phoenix-surefire.jdk8.tuning.flags} 
@{jacocoArgLine}</argLine>
+    <argLine>${phoenix-surefire.argLine} 
${phoenix-surefire.jdk8.tuning.flags}</argLine>
 
     <!-- Dependency versions -->
     <hive3.version>3.1.2</hive3.version>

Reply via email to