PHOENIX-2005 Connection utilities omit zk client port, parent znode
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c6b37b97 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c6b37b97 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c6b37b97 Branch: refs/heads/4.x-HBase-1.1 Commit: c6b37b979da1b514bcb9257c7e095e39b0c2c215 Parents: 3cdc323 Author: Nick Dimiduk <ndimi...@apache.org> Authored: Tue May 26 11:11:48 2015 -0700 Committer: Nick Dimiduk <ndimi...@apache.org> Committed: Tue May 26 13:27:03 2015 -0700 ---------------------------------------------------------------------- .../phoenix/jdbc/PhoenixEmbeddedDriver.java | 28 ++++-- .../phoenix/mapreduce/CsvBulkLoadTool.java | 93 ++++++++++---------- .../phoenix/mapreduce/CsvToKeyValueMapper.java | 26 +----- .../query/ConnectionQueryServicesImpl.java | 4 +- .../java/org/apache/phoenix/util/QueryUtil.java | 45 ++++++++-- .../phoenix/jdbc/PhoenixEmbeddedDriverTest.java | 14 ++- .../phoenix/mapreduce/CsvBulkLoadToolTest.java | 11 --- .../mapreduce/CsvToKeyValueMapperTest.java | 15 ---- .../org/apache/phoenix/util/QueryUtilTest.java | 33 ++++--- 9 files changed, 139 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c6b37b97/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java index 9e95667..2451603 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java @@ -31,6 +31,7 @@ import java.util.logging.Logger; import javax.annotation.concurrent.Immutable; +import org.apache.hadoop.hbase.HConstants; import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; @@ -174,10 +175,10 @@ public abstract class PhoenixEmbeddedDriver implements Driver, org.apache.phoeni } /** - * + * * Class to encapsulate connection info for HBase * - * + * * @since 0.1.1 */ public static class ConnectionInfo { @@ -204,12 +205,18 @@ public abstract class PhoenixEmbeddedDriver implements Driver, org.apache.phoeni return false; } - protected static ConnectionInfo create(String url) throws SQLException { - StringTokenizer tokenizer = new StringTokenizer(url == null ? "" : url.substring(PhoenixRuntime.JDBC_PROTOCOL.length()),DELIMITERS, true); + public static ConnectionInfo create(String url) throws SQLException { + url = url == null ? "" : url; + url = url.startsWith(PhoenixRuntime.JDBC_PROTOCOL) + ? url.substring(PhoenixRuntime.JDBC_PROTOCOL.length()) + : url; + StringTokenizer tokenizer = new StringTokenizer(url, DELIMITERS, true); int nTokens = 0; String[] tokens = new String[5]; String token = null; - while (tokenizer.hasMoreTokens() && !(token=tokenizer.nextToken()).equals(TERMINATOR) && tokenizer.hasMoreTokens() && nTokens < tokens.length) { + while (tokenizer.hasMoreTokens() && + !(token=tokenizer.nextToken()).equals(TERMINATOR) && + tokenizer.hasMoreTokens() && nTokens < tokens.length) { token = tokenizer.nextToken(); // This would mean we have an empty string for a token which is illegal if (DELIMITERS.contains(token)) { @@ -316,8 +323,7 @@ public abstract class PhoenixEmbeddedDriver implements Driver, org.apache.phoeni private final String principal; private final String keytab; - // used for testing - ConnectionInfo(String zookeeperQuorum, Integer port, String rootNode, String principal, String keytab) { + public ConnectionInfo(String zookeeperQuorum, Integer port, String rootNode, String principal, String keytab) { this.zookeeperQuorum = zookeeperQuorum; this.port = port; this.rootNode = rootNode; @@ -326,8 +332,7 @@ public abstract class PhoenixEmbeddedDriver implements Driver, org.apache.phoeni this.keytab = keytab; } - // used for testing - ConnectionInfo(String zookeeperQuorum, Integer port, String rootNode) { + public ConnectionInfo(String zookeeperQuorum, Integer port, String rootNode) { this(zookeeperQuorum, port, rootNode, null, null); } @@ -417,6 +422,11 @@ public abstract class PhoenixEmbeddedDriver implements Driver, org.apache.phoeni + (principal == null ? "" : ":" + principal) + (keytab == null ? "" : ":" + keytab); } + + public String toUrl() { + return PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + + toString(); + } } public static boolean isTestUrl(String url) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/c6b37b97/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java index 31f8b42..7afde98 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java @@ -18,11 +18,11 @@ package org.apache.phoenix.mapreduce; import java.sql.Connection; -import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -41,7 +41,6 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -56,8 +55,8 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.jdbc.PhoenixDriver; import org.apache.phoenix.job.JobManager; -import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; @@ -67,6 +66,7 @@ import org.apache.phoenix.util.CSVCommonsLoader; import org.apache.phoenix.util.ColumnInfo; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.StringUtil; import org.slf4j.Logger; @@ -84,7 +84,7 @@ public class CsvBulkLoadTool extends Configured implements Tool { private static final Logger LOG = LoggerFactory.getLogger(CsvBulkLoadTool.class); - static final Option ZK_QUORUM_OPT = new Option("z", "zookeeper", true, "Zookeeper quorum to connect to (optional)"); + static final Option ZK_QUORUM_OPT = new Option("z", "zookeeper", true, "Supply zookeeper connection details (optional)"); static final Option INPUT_PATH_OPT = new Option("i", "input", true, "Input CSV path (mandatory)"); static final Option OUTPUT_PATH_OPT = new Option("o", "output", true, "Output path for temporary HFiles (optional)"); static final Option SCHEMA_NAME_OPT = new Option("s", "schema", true, "Phoenix schema name (optional)"); @@ -184,35 +184,48 @@ public class CsvBulkLoadTool extends Configured implements Tool { } catch (IllegalStateException e) { printHelpAndExit(e.getMessage(), getOptions()); } - Class.forName(DriverManager.class.getName()); - Connection conn = DriverManager.getConnection( - getJdbcUrl(cmdLine.getOptionValue(ZK_QUORUM_OPT.getOpt()))); - - return loadData(conf, cmdLine, conn); + return loadData(conf, cmdLine); } - private int loadData(Configuration conf, CommandLine cmdLine, - Connection conn) throws SQLException, InterruptedException, - ExecutionException { - String tableName = cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt()); + private int loadData(Configuration conf, CommandLine cmdLine) throws SQLException, + InterruptedException, ExecutionException, ClassNotFoundException { + String tableName = cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt()); String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPT.getOpt()); String indexTableName = cmdLine.getOptionValue(INDEX_TABLE_NAME_OPT.getOpt()); String qualifiedTableName = getQualifiedTableName(schemaName, tableName); - String qualifedIndexTableName = null; - if(indexTableName != null){ - qualifedIndexTableName = getQualifiedTableName(schemaName, indexTableName); + String qualifiedIndexTableName = null; + if (indexTableName != null){ + qualifiedIndexTableName = getQualifiedTableName(schemaName, indexTableName); + } + + if (cmdLine.hasOption(ZK_QUORUM_OPT.getOpt())) { + // ZK_QUORUM_OPT is optional, but if it's there, use it for both the conn and the job. + String zkQuorum = cmdLine.getOptionValue(ZK_QUORUM_OPT.getOpt()); + PhoenixDriver.ConnectionInfo info = PhoenixDriver.ConnectionInfo.create(zkQuorum); + LOG.info("Configuring HBase connection to {}", info); + for (Map.Entry<String,String> entry : info.asProps()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting {} = {}", entry.getKey(), entry.getValue()); + } + conf.set(entry.getKey(), entry.getValue()); + } + } + + final Connection conn = QueryUtil.getConnection(conf); + if (LOG.isDebugEnabled()) { + LOG.debug("Reading columns from {} :: {}", ((PhoenixConnection) conn).getURL(), + qualifiedTableName); } List<ColumnInfo> importColumns = buildImportColumns(conn, cmdLine, qualifiedTableName); configureOptions(cmdLine, importColumns, conf); - try { validateTable(conn, schemaName, tableName); } finally { conn.close(); } - Path inputPath = new Path(cmdLine.getOptionValue(INPUT_PATH_OPT.getOpt())); - Path outputPath = null; + final Path inputPath = new Path(cmdLine.getOptionValue(INPUT_PATH_OPT.getOpt())); + final Path outputPath; if (cmdLine.hasOption(OUTPUT_PATH_OPT.getOpt())) { outputPath = new Path(cmdLine.getOptionValue(OUTPUT_PATH_OPT.getOpt())); } else { @@ -221,20 +234,21 @@ public class CsvBulkLoadTool extends Configured implements Tool { List<TargetTableRef> tablesToBeLoaded = new ArrayList<TargetTableRef>(); tablesToBeLoaded.add(new TargetTableRef(qualifiedTableName)); + // using conn after it's been closed... o.O tablesToBeLoaded.addAll(getIndexTables(conn, schemaName, qualifiedTableName)); // When loading a single index table, check index table name is correct - if(qualifedIndexTableName != null){ + if (qualifiedIndexTableName != null){ TargetTableRef targetIndexRef = null; for (TargetTableRef tmpTable : tablesToBeLoaded){ - if(tmpTable.getLogicalName().compareToIgnoreCase(qualifedIndexTableName) == 0) { + if (tmpTable.getLogicalName().compareToIgnoreCase(qualifiedIndexTableName) == 0) { targetIndexRef = tmpTable; break; } } - if(targetIndexRef == null){ + if (targetIndexRef == null){ throw new IllegalStateException("CSV Bulk Loader error: index table " + - qualifedIndexTableName + " doesn't exist"); + qualifiedIndexTableName + " doesn't exist"); } tablesToBeLoaded.clear(); tablesToBeLoaded.add(targetIndexRef); @@ -247,13 +261,14 @@ public class CsvBulkLoadTool extends Configured implements Tool { .getProps() .getBoolean(QueryServices.METRICS_ENABLED, QueryServicesOptions.DEFAULT_IS_METRICS_ENABLED); - ExecutorService executor = JobManager.createThreadPoolExec(Integer.MAX_VALUE, 5, 20, useInstrumentedPool); + ExecutorService executor = + JobManager.createThreadPoolExec(Integer.MAX_VALUE, 5, 20, useInstrumentedPool); try{ for (TargetTableRef table : tablesToBeLoaded) { Path tablePath = new Path(outputPath, table.getPhysicalName()); Configuration jobConf = new Configuration(conf); jobConf.set(CsvToKeyValueMapper.TABLE_NAME_CONFKEY, qualifiedTableName); - if(qualifiedTableName.compareToIgnoreCase(table.getLogicalName()) != 0) { + if (qualifiedTableName.compareToIgnoreCase(table.getLogicalName()) != 0) { jobConf.set(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY, table.getPhysicalName()); } TableLoader tableLoader = new TableLoader( @@ -274,14 +289,6 @@ public class CsvBulkLoadTool extends Configured implements Tool { return retCode; } - String getJdbcUrl(String zkQuorum) { - if (zkQuorum == null) { - LOG.warn("Defaulting to localhost for ZooKeeper quorum"); - zkQuorum = "localhost:2181"; - } - return PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum; - } - /** * Build up the list of columns to be imported. The list is taken from the command line if * present, otherwise it is taken from the table description. @@ -327,9 +334,11 @@ public class CsvBulkLoadTool extends Configured implements Tool { * @param importColumns descriptors of columns to be imported * @param conf job configuration */ - @VisibleForTesting - static void configureOptions(CommandLine cmdLine, List<ColumnInfo> importColumns, - Configuration conf) { + private static void configureOptions(CommandLine cmdLine, List<ColumnInfo> importColumns, + Configuration conf) throws SQLException { + + // we don't parse ZK_QUORUM_OPT here because we need it in order to + // create the connection we need to build importColumns. char delimiterChar = ','; if (cmdLine.hasOption(DELIMITER_OPT.getOpt())) { @@ -358,12 +367,6 @@ public class CsvBulkLoadTool extends Configured implements Tool { escapeChar = escapeString.charAt(0); } - if (cmdLine.hasOption(ZK_QUORUM_OPT.getOpt())) { - String zkQuorum = cmdLine.getOptionValue(ZK_QUORUM_OPT.getOpt()); - LOG.info("Configuring ZK quorum to {}", zkQuorum); - conf.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum); - } - CsvBulkImportUtil.initCsvImportJob( conf, getQualifiedTableName( @@ -493,7 +496,7 @@ public class CsvBulkLoadTool extends Configured implements Tool { job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); - // initialize credentials to possibily run in a secure env + // initialize credentials to possibly run in a secure env TableMapReduceUtil.initCredentials(job); HTable htable = new HTable(conf, tableName); @@ -522,8 +525,8 @@ public class CsvBulkLoadTool extends Configured implements Tool { } return true; - } catch(Exception ex) { - LOG.error("Import job on table=" + tableName + " failed due to exception:" + ex); + } catch (Exception ex) { + LOG.error("Import job on table=" + tableName + " failed due to exception.", ex); return false; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c6b37b97/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java index 90cb854..c0328bd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java @@ -19,7 +19,6 @@ package org.apache.phoenix.mapreduce; import java.io.IOException; import java.io.StringReader; -import java.sql.DriverManager; import java.sql.SQLException; import java.util.Iterator; import java.util.List; @@ -32,7 +31,6 @@ import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; @@ -41,11 +39,11 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.jdbc.PhoenixDriver; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.util.CSVCommonsLoader; import org.apache.phoenix.util.ColumnInfo; import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.csv.CsvUpsertExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -108,7 +106,6 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes protected void setup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); - String jdbcUrl = getJdbcUrl(conf); // pass client configuration into driver Properties clientInfos = new Properties(); @@ -118,12 +115,9 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes clientInfos.setProperty(entry.getKey(), entry.getValue()); } - // This statement also ensures that the driver class is loaded - LOG.info("Connection with driver {} with url {}", PhoenixDriver.class.getName(), jdbcUrl); - try { - conn = (PhoenixConnection) DriverManager.getConnection(jdbcUrl, clientInfos); - } catch (SQLException e) { + conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf); + } catch (SQLException | ClassNotFoundException e) { throw new RuntimeException(e); } @@ -189,20 +183,6 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes } } - /** - * Build up the JDBC URL for connecting to Phoenix. - * - * @return the full JDBC URL for a Phoenix connection - */ - @VisibleForTesting - static String getJdbcUrl(Configuration conf) { - String zkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM); - if (zkQuorum == null) { - throw new IllegalStateException(HConstants.ZOOKEEPER_QUORUM + " is not configured"); - } - return PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum; - } - @VisibleForTesting CsvUpsertExecutor buildUpsertExecutor(Configuration conf) { String tableName = conf.get(TABLE_NAME_CONFKEY); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c6b37b97/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 30b43d5..b071bc5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -807,7 +807,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement boolean isMetaTable = SchemaUtil.isMetaTable(tableName); boolean tableExist = true; try { - logger.info("Found quorum: " + ZKConfig.getZKQuorumServersString(config)); + final String quorum = ZKConfig.getZKQuorumServersString(config); + final String znode = config.get(HConstants.ZOOKEEPER_ZNODE_PARENT); + logger.debug("Found quorum: " + quorum + ":" + znode); admin = new HBaseAdmin(config); try { existingDesc = admin.getTableDescriptor(tableName); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c6b37b97/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java index d63a68f..bd38983 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java @@ -34,11 +34,13 @@ import javax.annotation.Nullable; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver; import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.parse.HintNode.Hint; import org.apache.phoenix.parse.WildcardParseNode; @@ -49,8 +51,6 @@ import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import java.util.Iterator; -import java.util.Map; public final class QueryUtil { @@ -129,7 +129,7 @@ public final class QueryUtil { * * @param tableName name of the table for which the upsert statement is to be created * @param columns list of columns to be included in the upsert statement - * @param Hint hint to be added to the UPSERT statement. + * @param hint hint to be added to the UPSERT statement. * @return the created {@code UPSERT} statement */ public static String constructUpsertStatement(String tableName, List<String> columns, Hint hint) { @@ -222,13 +222,36 @@ public final class QueryUtil { return query.toString(); } - public static String getUrl(String server) { - return PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + server; + /** + * Create the Phoenix JDBC connection URL from the provided cluster connection details. + */ + public static String getUrl(String zkQuorum) { + return getUrlInternal(zkQuorum, null, null); + } + + /** + * Create the Phoenix JDBC connection URL from the provided cluster connection details. + */ + public static String getUrl(String zkQuorum, int clientPort) { + return getUrlInternal(zkQuorum, clientPort, null); + } + + /** + * Create the Phoenix JDBC connection URL from the provided cluster connection details. + */ + public static String getUrl(String zkQuorum, String znodeParent) { + return getUrlInternal(zkQuorum, null, znodeParent); + } + + /** + * Create the Phoenix JDBC connection URL from the provided cluster connection details. + */ + public static String getUrl(String zkQuorum, int port, String znodeParent) { + return getUrlInternal(zkQuorum, port, znodeParent); } - public static String getUrl(String server, long port) { - String serverUrl = getUrl(server); - return serverUrl + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + port + private static String getUrlInternal(String zkQuorum, Integer port, String znodeParent) { + return new PhoenixEmbeddedDriver.ConnectionInfo(zkQuorum, port, znodeParent).toUrl() + PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR; } @@ -274,6 +297,7 @@ public final class QueryUtil { public static String getConnectionUrl(Properties props, Configuration conf) throws ClassNotFoundException, SQLException { + // TODO: props is ignored! // make sure we load the phoenix driver Class.forName(PhoenixDriver.class.getName()); @@ -304,12 +328,15 @@ public final class QueryUtil { if (port == -1) { port = conf.getInt(QueryServices.ZOOKEEPER_PORT_ATTRIB, -1); if (port == -1) { + // TODO: fall back to the default in HConstants#DEFAULT_ZOOKEPER_CLIENT_PORT throw new RuntimeException("Client zk port was not set!"); } } server = Joiner.on(',').join(servers); + String znodeParent = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, + HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); - return getUrl(server, port); + return getUrl(server, port, znodeParent); } public static String getViewStatement(String schemaName, String tableName, String where) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/c6b37b97/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriverTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriverTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriverTest.java index 79f9ec6..083b205 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriverTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriverTest.java @@ -40,9 +40,11 @@ public class PhoenixEmbeddedDriverTest { "jdbc:phoenix:localhost:123", "jdbc:phoenix:localhost:123;foo=bar", "jdbc:phoenix:localhost:123:/hbase", - "jdbc:phoenix:localhost:123:/hbase;foo=bas", + "jdbc:phoenix:localhost:123:/foo-bar", + "jdbc:phoenix:localhost:123:/foo-bar;foo=bas", "jdbc:phoenix:localhost:/hbase", - "jdbc:phoenix:localhost:/hbase;test=true", + "jdbc:phoenix:localhost:/foo-bar", + "jdbc:phoenix:localhost:/foo-bar;test=true", "jdbc:phoenix:v1,v2,v3", "jdbc:phoenix:v1,v2,v3;", "jdbc:phoenix:v1,v2,v3;test=true", @@ -51,6 +53,7 @@ public class PhoenixEmbeddedDriverTest { "jdbc:phoenix:v1,v2,v3:123:/hbase", "jdbc:phoenix:v1,v2,v3:123:/hbase;test=false", "jdbc:phoenix:v1,v2,v3:123:/hbase:user/principal:/user.keytab;test=false", + "jdbc:phoenix:v1,v2,v3:123:/foo-bar:user/principal:/user.keytab;test=false", "jdbc:phoenix:v1,v2,v3:123:user/principal:/user.keytab;test=false", "jdbc:phoenix:v1,v2,v3:user/principal:/user.keytab;test=false", "jdbc:phoenix:v1,v2,v3:/hbase:user/principal:/user.keytab;test=false", @@ -64,9 +67,11 @@ public class PhoenixEmbeddedDriverTest { new ConnectionInfo("localhost",123,null), new ConnectionInfo("localhost",123,null), new ConnectionInfo("localhost",123,"/hbase"), - new ConnectionInfo("localhost",123,"/hbase"), - new ConnectionInfo("localhost",null,"/hbase"), + new ConnectionInfo("localhost",123,"/foo-bar"), + new ConnectionInfo("localhost",123,"/foo-bar"), new ConnectionInfo("localhost",null,"/hbase"), + new ConnectionInfo("localhost",null,"/foo-bar"), + new ConnectionInfo("localhost",null,"/foo-bar"), new ConnectionInfo("v1,v2,v3",null,null), new ConnectionInfo("v1,v2,v3",null,null), new ConnectionInfo("v1,v2,v3",null,null), @@ -75,6 +80,7 @@ public class PhoenixEmbeddedDriverTest { new ConnectionInfo("v1,v2,v3",123,"/hbase"), new ConnectionInfo("v1,v2,v3",123,"/hbase"), new ConnectionInfo("v1,v2,v3",123,"/hbase","user/principal", "/user.keytab" ), + new ConnectionInfo("v1,v2,v3",123,"/foo-bar","user/principal", "/user.keytab" ), new ConnectionInfo("v1,v2,v3",123, null,"user/principal", "/user.keytab" ), new ConnectionInfo("v1,v2,v3", null, null,"user/principal", "/user.keytab" ), new ConnectionInfo("v1,v2,v3",null,"/hbase","user/principal", "/user.keytab" ), http://git-wip-us.apache.org/repos/asf/phoenix/blob/c6b37b97/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolTest.java index 31fc71c..33bb976 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolTest.java @@ -66,15 +66,4 @@ public class CsvBulkLoadToolTest { public void testGetQualifiedTableName_NullSchema() { assertEquals("MYTABLE", CsvBulkLoadTool.getQualifiedTableName(null, "myTable")); } - - @Test - public void testGetJdbcUrl_WithQuorumSupplied() { - assertEquals("jdbc:phoenix:myzkhost:2181", bulkLoadTool.getJdbcUrl("myzkhost:2181")); - } - - @Test - public void testGetJdbcUrl_NoQuorumSupplied() { - assertEquals("jdbc:phoenix:localhost:2181", bulkLoadTool.getJdbcUrl(null)); - } - } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c6b37b97/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java index 4033a65..dc6f497 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java @@ -95,21 +95,6 @@ public class CsvToKeyValueMapperTest { } @Test - public void testGetJdbcUrl() { - Configuration conf = new Configuration(); - conf.set(HConstants.ZOOKEEPER_QUORUM, "myzkclient:2181"); - String jdbcUrl = CsvToKeyValueMapper.getJdbcUrl(conf); - - assertEquals("jdbc:phoenix:myzkclient:2181", jdbcUrl); - } - - @Test(expected=IllegalStateException.class) - public void testGetJdbcUrl_NotConfigured() { - Configuration conf = new Configuration(); - CsvToKeyValueMapper.getJdbcUrl(conf); - } - - @Test public void testLoadPreUpdateProcessor() { Configuration conf = new Configuration(); conf.setClass(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, MockUpsertProcessor.class, http://git-wip-us.apache.org/repos/asf/phoenix/blob/c6b37b97/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java index beabaf1..8446e9e 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java @@ -17,10 +17,6 @@ */ package org.apache.phoenix.util; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - import java.sql.Types; import java.util.Properties; @@ -30,6 +26,8 @@ import org.junit.Test; import com.google.common.collect.ImmutableList; +import static org.junit.Assert.*; + public class QueryUtilTest { private static final ColumnInfo ID_COLUMN = new ColumnInfo("ID", Types.BIGINT); @@ -96,19 +94,28 @@ public class QueryUtilTest { } private void validateUrl(String url) { - String prefix = QueryUtil.getUrl(""); + String prefix = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; assertTrue("JDBC URL missing jdbc protocol prefix", url.startsWith(prefix)); - //remove the prefix, should only be left with server,server...:port - url = url.substring(prefix.length()+1); - // make sure only a single ':' - assertEquals("More than a single ':' in url: "+url, url.indexOf(PhoenixRuntime - .JDBC_PROTOCOL_SEPARATOR), - url.lastIndexOf(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR)); + assertTrue("JDBC URL missing jdbc terminator suffix", url.endsWith(";")); + // remove the prefix, should only be left with server[,server...]:port:/znode + url = url.substring(prefix.length()); + String[] splits = url.split(":"); + assertTrue("zk details should contain at least server component", splits.length >= 1); // make sure that each server is comma separated - url = url.substring(0, url.indexOf(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR)); - String[] servers = url.split(","); + String[] servers = splits[0].split(","); for(String server: servers){ assertFalse("Found whitespace in server names for url: " + url, server.contains(" ")); } + if (splits.length >= 2) { + // second bit is a port number, should not through + try { + Integer.parseInt(splits[1]); + } catch (NumberFormatException e) { + fail(e.getMessage()); + } + } + if (splits.length >= 3) { + assertTrue("znode parent is not an absolute path", splits[2].startsWith("/")); + } } } \ No newline at end of file