This is an automated email from the ASF dual-hosted git repository. stoty pushed a commit to branch 5.1 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push: new 21f4532ba3 PHOENIX-7193 Fix cluster override for mapreduce jobs for non-ZK registries 21f4532ba3 is described below commit 21f4532ba3781d653ac66f1ba03b579dc41f8641 Author: Istvan Toth <st...@apache.org> AuthorDate: Wed Jan 31 15:56:04 2024 +0100 PHOENIX-7193 Fix cluster override for mapreduce jobs for non-ZK registries --- .../phoenix/mapreduce/PhoenixOutputFormat.java | 5 +- .../phoenix/mapreduce/PhoenixRecordWriter.java | 2 +- .../phoenix/mapreduce/util/ConnectionUtil.java | 89 ++++++---------------- .../mapreduce/util/PhoenixConfigurationUtil.java | 79 ++++++++++++++++++- .../apache/phoenix/index/IndexUpgradeToolTest.java | 15 +++- .../mapreduce/PhoenixMultiViewInputFormatTest.java | 29 ++++--- .../util/PhoenixConfigurationUtilTest.java | 24 +++--- .../apache/phoenix/query/ConnectionlessTest.java | 6 +- .../org/apache/phoenix/util/QueryUtilTest.java | 5 ++ 9 files changed, 153 insertions(+), 101 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java index 055ce1f93c..23847cb397 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java @@ -38,14 +38,13 @@ import org.slf4j.LoggerFactory; */ public class PhoenixOutputFormat <T extends DBWritable> extends OutputFormat<NullWritable,T> { private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixOutputFormat.class); - private final Set<String> propsToIgnore; public PhoenixOutputFormat() { this(Collections.<String>emptySet()); } + // FIXME Never used, and the ignore feature didn't work anyway public PhoenixOutputFormat(Set<String> propsToIgnore) { - this.propsToIgnore = propsToIgnore; } @Override @@ -63,7 +62,7 @@ public class PhoenixOutputFormat <T extends DBWritable> extends OutputFormat<Nul @Override public RecordWriter<NullWritable, T> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { try { - return new PhoenixRecordWriter<T>(context.getConfiguration(), propsToIgnore); + return new PhoenixRecordWriter<T>(context.getConfiguration()); } catch (SQLException e) { LOGGER.error("Error calling PhoenixRecordWriter " + e.getMessage()); throw new RuntimeException(e); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java index 6f5b84e366..14e986c159 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java @@ -54,7 +54,7 @@ public class PhoenixRecordWriter<T extends DBWritable> extends RecordWriter<Nul public PhoenixRecordWriter(final Configuration configuration, Set<String> propsToIgnore) throws SQLException { Connection connection = null; try { - connection = ConnectionUtil.getOutputConnectionWithoutTheseProps(configuration, propsToIgnore); + connection = ConnectionUtil.getOutputConnection(configuration); this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration); final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration); this.statement = connection.prepareStatement(upsertQuery); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java index ccd55fd059..7b7d8431be 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java @@ -20,30 +20,25 @@ package org.apache.phoenix.mapreduce.util; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; -import java.util.Collections; import java.util.Properties; -import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; -import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; -import org.apache.phoenix.util.QueryUtil; /** * Utility class to return a {@link Connection} . */ public class ConnectionUtil { - private static String TEST_PARAM = - PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR + PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM; - /** * Retrieve the configured input Connection. * @param conf configuration containing connection information * @return the configured input connection */ public static Connection getInputConnection(final Configuration conf) throws SQLException { + Preconditions.checkNotNull(conf); return getInputConnection(conf, new Properties()); } @@ -55,22 +50,17 @@ public class ConnectionUtil { */ public static Connection getInputConnection(final Configuration conf, final Properties props) throws SQLException { - Preconditions.checkNotNull(conf); - String zkQuorumOverride = PhoenixConfigurationUtil.getInputClusterZkQuorum(conf); - if (zkQuorumOverride != null) { - return DriverManager.getConnection("jdbc:phoenix+zk:" + zkQuorumOverride, - PropertiesUtil.combineProperties(props, conf)); - } else { - // FIXME find some better way to get tests working - String zkQuorumForTest = PhoenixConfigurationUtil.getZKQuorum(conf); - if (zkQuorumForTest != null && (zkQuorumForTest.contains(TEST_PARAM) - || zkQuorumForTest.equals(PhoenixRuntime.CONNECTIONLESS))) { - return DriverManager.getConnection("jdbc:phoenix+zk:" + zkQuorumForTest, - PropertiesUtil.combineProperties(props, conf)); - } - return DriverManager.getConnection("jdbc:phoenix", - PropertiesUtil.combineProperties(props, conf)); + String inputQuorum = PhoenixConfigurationUtil.getInputCluster(conf); + if (inputQuorum != null) { + // This will not override the quorum set with setInputClusterUrl + Properties copyProps = PropertiesUtil.deepCopy(props); + copyProps.setProperty(HConstants.CLIENT_ZOOKEEPER_QUORUM, inputQuorum); + return DriverManager.getConnection( + PhoenixConfigurationUtil.getInputClusterUrl(conf), + PropertiesUtil.combineProperties(copyProps, conf)); } + return DriverManager.getConnection(PhoenixConfigurationUtil.getInputClusterUrl(conf), + PropertiesUtil.combineProperties(props, conf)); } /** @@ -82,16 +72,6 @@ public class ConnectionUtil { return getOutputConnection(conf, new Properties()); } - /** - * Create the configured output Connection. - * @param conf configuration containing the connection information - * @return the configured output connection - */ - public static Connection getOutputConnectionWithoutTheseProps(final Configuration conf, - Set<String> ignoreTheseProps) throws SQLException { - return getOutputConnection(conf, new Properties(), ignoreTheseProps); - } - /** * Create the configured output Connection. * @param conf configuration containing the connection information @@ -100,42 +80,17 @@ public class ConnectionUtil { */ public static Connection getOutputConnection(final Configuration conf, Properties props) throws SQLException { - return getOutputConnection(conf, props, Collections.<String>emptySet()); - } - - public static Connection getOutputConnection(final Configuration conf, Properties props, - Set<String> withoutTheseProps) throws SQLException { Preconditions.checkNotNull(conf); - String zkQuorumOverride = PhoenixConfigurationUtil.getOutputClusterZkQuorum(conf); - if (zkQuorumOverride != null) { - return DriverManager.getConnection("jdbc:phoenix+zk:" + zkQuorumOverride, - PropertiesUtil.combineProperties(props, conf)); - } else { - // FIXME find some better way to get tests working - String zkQuorumForTest = PhoenixConfigurationUtil.getZKQuorum(conf); - if (zkQuorumForTest != null && (zkQuorumForTest.contains(TEST_PARAM) - || zkQuorumForTest.equals(PhoenixRuntime.CONNECTIONLESS))) { - return DriverManager.getConnection("jdbc:phoenix:" + zkQuorumForTest, - PropertiesUtil.combineProperties(props, conf)); - } - return DriverManager.getConnection("jdbc:phoenix", - PropertiesUtil.combineProperties(props, conf)); + String outputQuorum = PhoenixConfigurationUtil.getOutputCluster(conf); + if (outputQuorum != null) { + // This will not override the quorum set with setInputClusterUrl + Properties copyProps = PropertiesUtil.deepCopy(props); + copyProps.setProperty(HConstants.CLIENT_ZOOKEEPER_QUORUM, outputQuorum); + return DriverManager.getConnection( + PhoenixConfigurationUtil.getInputClusterUrl(conf), + PropertiesUtil.combineProperties(copyProps, conf)); } + return DriverManager.getConnection(PhoenixConfigurationUtil.getOutputClusterUrl(conf), + PropertiesUtil.combineProperties(props, conf)); } - - /** - * Returns the {@link Connection} from a ZooKeeper cluster string. - * @param quorum a ZooKeeper quorum connection string - * @param clientPort a ZooKeeper client port - * @param znodeParent a zookeeper znode parent - * @return a Phoenix connection to the given connection string - */ - @Deprecated - private static Connection getConnection(final String quorum, final Integer clientPort, - final String znodeParent, Properties props) throws SQLException { - Preconditions.checkNotNull(quorum); - return DriverManager.getConnection(QueryUtil.getUrl(quorum, clientPort, znodeParent), - props); - } - } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java index 6a368110c1..da5869175d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java @@ -111,10 +111,16 @@ public final class PhoenixConfigurationUtil { /** Configuration key for the class name of an ImportPreUpsertKeyValueProcessor */ public static final String UPSERT_HOOK_CLASS_CONFKEY = "phoenix.mapreduce.import.kvprocessor"; + @Deprecated public static final String MAPREDUCE_INPUT_CLUSTER_QUORUM = "phoenix.mapreduce.input.cluster.quorum"; - + + @Deprecated public static final String MAPREDUCE_OUTPUT_CLUSTER_QUORUM = "phoneix.mapreduce.output.cluster.quorum"; + public static final String MAPREDUCE_INPUT_CLUSTER_URL = "phoenix.mapreduce.input.cluster.url"; + + public static final String MAPREDUCE_OUTPUT_CLUSTER_URL = "phoenix.mapreduce.output.cluster.url"; + public static final String INDEX_DISABLED_TIMESTAMP_VALUE = "phoenix.mr.index.disableTimestamp"; public static final String INDEX_MAINTAINERS = "phoenix.mr.index.maintainers"; @@ -369,6 +375,7 @@ public final class PhoenixConfigurationUtil { * @param configuration * @param quorum ZooKeeper quorum string for HBase cluster the MapReduce job will read from */ + @Deprecated public static void setInputCluster(final Configuration configuration, final String quorum) { Preconditions.checkNotNull(configuration); @@ -380,12 +387,35 @@ public final class PhoenixConfigurationUtil { * @param configuration * @param quorum ZooKeeper quorum string for HBase cluster the MapReduce job will write to */ + @Deprecated public static void setOutputCluster(final Configuration configuration, final String quorum) { Preconditions.checkNotNull(configuration); configuration.set(MAPREDUCE_OUTPUT_CLUSTER_QUORUM, quorum); } - + + /** + * Sets which HBase cluster a Phoenix MapReduce job should read from + * @param configuration + * @param url Phoenix JDBC URL + */ + public static void setInputClusterUrl(final Configuration configuration, + final String url) { + Preconditions.checkNotNull(configuration); + configuration.set(PhoenixConfigurationUtil.MAPREDUCE_INPUT_CLUSTER_URL, url); + } + + /** + * Sets which HBase cluster a Phoenix MapReduce job should write to + * @param configuration + * @param url Phoenix JDBC URL string for HBase cluster the MapReduce job will write to + */ + public static void setOutputClusterUrl(final Configuration configuration, + final String url) { + Preconditions.checkNotNull(configuration); + configuration.set(PhoenixConfigurationUtil.MAPREDUCE_OUTPUT_CLUSTER_URL, url); + } + public static Class<?> getInputClass(final Configuration configuration) { return configuration.getClass(INPUT_CLASS, NullDBWritable.class); } @@ -587,32 +617,71 @@ public final class PhoenixConfigurationUtil { public static String getInputCluster(final Configuration configuration) { Preconditions.checkNotNull(configuration); String quorum = configuration.get(MAPREDUCE_INPUT_CLUSTER_QUORUM); + if (quorum == null) { + quorum = configuration.get(HConstants.CLIENT_ZOOKEEPER_QUORUM); + } if (quorum == null) { quorum = configuration.get(HConstants.ZOOKEEPER_QUORUM); } return quorum; } + /** + * Returns the Phoenix JDBC URL a Phoenix MapReduce job will read + * from. If MAPREDUCE_INPUT_CLUSTER_URL is not set, then it returns the value of + * "jdbc:phoenix" + * @param configuration + * @return URL string + */ + public static String getInputClusterUrl(final Configuration configuration) { + Preconditions.checkNotNull(configuration); + String url = configuration.get(MAPREDUCE_INPUT_CLUSTER_URL); + if (url == null) { + url = PhoenixRuntime.JDBC_PROTOCOL; + } + return url; + } + + /** * Returns the ZooKeeper quorum string for the HBase cluster a Phoenix MapReduce job will * read from * @param configuration * @return ZooKeeper quorum string if defined, null otherwise */ + @Deprecated public static String getInputClusterZkQuorum(final Configuration configuration) { Preconditions.checkNotNull(configuration); return configuration.get(MAPREDUCE_INPUT_CLUSTER_QUORUM); } + /** + * Returns the Phoenix JDBC URL a Phoenix MapReduce job will write to. + * If MAPREDUCE_OUTPUT_CLUSTER_URL is not set, then it returns the value of + * "jdbc:phoenix" + * @param configuration + * @return URL string + */ + public static String getOutputClusterUrl(final Configuration configuration) { + Preconditions.checkNotNull(configuration); + String quorum = configuration.get(MAPREDUCE_OUTPUT_CLUSTER_URL); + if (quorum == null) { + quorum = PhoenixRuntime.JDBC_PROTOCOL; + } + return quorum; + } + /** * Returns the value of HConstants.ZOOKEEPER_QUORUM. * For tests only * @param configuration * @return ZooKeeper quorum string if defined, null otherwise */ + @Deprecated public static String getZKQuorum(final Configuration configuration) { Preconditions.checkNotNull(configuration); - return configuration.get(HConstants.ZOOKEEPER_QUORUM); + return configuration.get(HConstants.CLIENT_ZOOKEEPER_QUORUM, + configuration.get(HConstants.ZOOKEEPER_QUORUM)); } /** @@ -626,6 +695,9 @@ public final class PhoenixConfigurationUtil { public static String getOutputCluster(final Configuration configuration) { Preconditions.checkNotNull(configuration); String quorum = configuration.get(MAPREDUCE_OUTPUT_CLUSTER_QUORUM); + if (quorum == null) { + quorum = configuration.get(HConstants.CLIENT_ZOOKEEPER_QUORUM); + } if (quorum == null) { quorum = configuration.get(HConstants.ZOOKEEPER_QUORUM); } @@ -637,6 +709,7 @@ public final class PhoenixConfigurationUtil { * @param configuration * @return ZooKeeper quorum string if defined, null otherwise */ + @Deprecated public static String getOutputClusterZkQuorum(final Configuration configuration) { Preconditions.checkNotNull(configuration); return configuration.get(MAPREDUCE_OUTPUT_CLUSTER_QUORUM); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexUpgradeToolTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexUpgradeToolTest.java index f06376fd43..c90a56d43f 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexUpgradeToolTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexUpgradeToolTest.java @@ -19,6 +19,10 @@ package org.apache.phoenix.index; import static org.apache.phoenix.mapreduce.index.IndexUpgradeTool.ROLLBACK_OP; import static org.apache.phoenix.mapreduce.index.IndexUpgradeTool.UPGRADE_OP; +import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR; +import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM; import java.sql.Connection; import java.util.Arrays; @@ -34,6 +38,9 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities; import org.apache.phoenix.mapreduce.index.IndexTool; import org.apache.phoenix.mapreduce.index.IndexUpgradeTool; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.query.BaseConnectionlessQueryTest; +import org.apache.phoenix.query.ConnectionlessTest; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.util.PhoenixRuntime; @@ -47,7 +54,7 @@ import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) -public class IndexUpgradeToolTest { +public class IndexUpgradeToolTest extends BaseConnectionlessQueryTest{ private static final String INPUT_LIST = "TEST.MOCK1,TEST1.MOCK2,TEST.MOCK3"; private final boolean upgrade; private static final String DUMMY_STRING_VALUE = "anyValue"; @@ -174,7 +181,11 @@ public class IndexUpgradeToolTest { } private void setupConfForConnectionlessQuery(Configuration conf) { - conf.set(HConstants.ZOOKEEPER_QUORUM, PhoenixRuntime.CONNECTIONLESS); + String connectionlessUrl = PhoenixRuntime.JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + + CONNECTIONLESS + JDBC_PROTOCOL_TERMINATOR + + PHOENIX_TEST_DRIVER_URL_PARAM + JDBC_PROTOCOL_TERMINATOR; + PhoenixConfigurationUtil.setInputClusterUrl(conf, connectionlessUrl); + PhoenixConfigurationUtil.setOutputClusterUrl(conf, connectionlessUrl); conf.unset(HConstants.ZOOKEEPER_CLIENT_PORT); conf.unset(HConstants.ZOOKEEPER_ZNODE_PARENT); } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormatTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormatTest.java index b3677b27d2..e8f2864e8d 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormatTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormatTest.java @@ -17,13 +17,6 @@ */ package org.apache.phoenix.mapreduce; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.mapred.JobContext; -import org.apache.phoenix.util.PhoenixRuntime; -import org.junit.Test; -import org.mockito.Mockito; - import static junit.framework.TestCase.assertTrue; import static junit.framework.TestCase.fail; import static org.apache.phoenix.mapreduce.util. @@ -32,17 +25,33 @@ import static org.apache.phoenix.mapreduce.util. PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_SPLIT_STRATEGY_CLAZZ; import static org.apache.phoenix.mapreduce.util. PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_STRATEGY_CLAZZ; +import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_ZK; +import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM; import static org.mockito.Mockito.when; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobContext; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.junit.Test; +import org.mockito.Mockito; + + public class PhoenixMultiViewInputFormatTest { + private static String CONNECTIONLESS_URL = + JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + CONNECTIONLESS + JDBC_PROTOCOL_TERMINATOR + + PHOENIX_TEST_DRIVER_URL_PARAM + JDBC_PROTOCOL_TERMINATOR; + @Test public void testDefaultConfig() throws Exception { PhoenixMultiViewInputFormat multiViewInputFormat = new PhoenixMultiViewInputFormat(); Configuration config = new Configuration(); config.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, "10"); - config.set(HConstants.ZOOKEEPER_QUORUM, PhoenixRuntime.CONNECTIONLESS); + PhoenixConfigurationUtil.setInputClusterUrl(config, CONNECTIONLESS_URL); JobContext mockContext = Mockito.mock(JobContext.class); when(mockContext.getConfiguration()).thenReturn(config); @@ -58,7 +67,7 @@ public class PhoenixMultiViewInputFormatTest { Configuration config = new Configuration(); config.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, "10"); config.set(MAPREDUCE_MULTI_INPUT_STRATEGY_CLAZZ, "dummy.path"); - config.set(HConstants.ZOOKEEPER_QUORUM, PhoenixRuntime.CONNECTIONLESS); + PhoenixConfigurationUtil.setInputClusterUrl(config, CONNECTIONLESS_URL); JobContext mockContext = Mockito.mock(JobContext.class); when(mockContext.getConfiguration()).thenReturn(config); @@ -77,7 +86,7 @@ public class PhoenixMultiViewInputFormatTest { Configuration config = new Configuration(); config.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, "10"); config.set(MAPREDUCE_MULTI_INPUT_SPLIT_STRATEGY_CLAZZ, "dummy.path"); - config.set(HConstants.ZOOKEEPER_QUORUM, PhoenixRuntime.CONNECTIONLESS); + PhoenixConfigurationUtil.setInputClusterUrl(config, CONNECTIONLESS_URL); JobContext mockContext = Mockito.mock(JobContext.class); when(mockContext.getConfiguration()).thenReturn(config); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java index b678514c64..5052861bb1 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java @@ -17,9 +17,6 @@ */ package org.apache.phoenix.mapreduce.util; -import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR; -import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM; import static org.junit.Assert.assertEquals; import java.sql.Connection; @@ -28,10 +25,12 @@ import java.sql.DriverManager; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.mapreduce.Job; +import org.apache.phoenix.jdbc.ZKConnectionInfo; import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MRJobType; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType; import org.apache.phoenix.query.BaseConnectionlessQueryTest; +import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; @@ -45,10 +44,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest { private static final String ORIGINAL_CLUSTER_QUORUM = "myzookeeperhost"; private static final String OVERRIDE_CLUSTER_QUORUM = "myoverridezookeeperhost"; - // This is a hack that relies on the way The URL is re-constructed from Configuration to - // generate a Test connection for the MR jobs - protected static String TEST_ZK_QUORUM = - CONNECTIONLESS + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM + JDBC_PROTOCOL_TERMINATOR; + protected static String TEST_URL = TestUtil.PHOENIX_CONNECTIONLESS_JDBC_URL; @Test /** @@ -76,7 +72,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest { " AS SELECT * FROM " + tableName + "\n"; conn.createStatement().execute(viewDdl); final Configuration configuration = new Configuration (); - configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM); + PhoenixConfigurationUtil.setOutputClusterUrl(configuration, TEST_URL); PhoenixConfigurationUtil.setOutputTableName(configuration, viewName); PhoenixConfigurationUtil.setPhysicalTableName(configuration, viewName); PhoenixConfigurationUtil.setUpsertColumnNames(configuration, new String[] {"A_STRING", "A_BINARY", "COL1"}); @@ -105,7 +101,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest { " CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n"; conn.createStatement().execute(ddl); final Configuration configuration = new Configuration (); - configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM); + PhoenixConfigurationUtil.setOutputClusterUrl(configuration, TEST_URL); PhoenixConfigurationUtil.setOutputTableName(configuration, tableName); PhoenixConfigurationUtil.setPhysicalTableName(configuration, tableName); PhoenixConfigurationUtil.setUpsertColumnNames(configuration, new String[] {"A_STRING", "A_BINARY", "COL1"}); @@ -132,7 +128,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest { " CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n"; conn.createStatement().execute(ddl); final Configuration configuration = new Configuration (); - configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM); + PhoenixConfigurationUtil.setOutputClusterUrl(configuration, TEST_URL); PhoenixConfigurationUtil.setOutputTableName(configuration, tableName); PhoenixConfigurationUtil.setPhysicalTableName(configuration, tableName); final String upserStatement = PhoenixConfigurationUtil.getUpsertStatement(configuration); @@ -153,7 +149,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest { " CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n"; conn.createStatement().execute(ddl); final Configuration configuration = new Configuration (); - configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM); + PhoenixConfigurationUtil.setInputClusterUrl(configuration, TEST_URL); PhoenixConfigurationUtil.setInputTableName(configuration, tableName); final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration); final String expectedSelectStatement = "SELECT \"A_STRING\" , \"A_BINARY\" , \"0\".\"COL1\" FROM " + tableName ; @@ -175,7 +171,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest { " CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n"; conn.createStatement().execute(ddl); final Configuration configuration = new Configuration (); - configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM); + PhoenixConfigurationUtil.setInputClusterUrl(configuration, TEST_URL); PhoenixConfigurationUtil.setInputTableName(configuration, fullTableName); final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration); final String expectedSelectStatement = "SELECT \"A_STRING\" , \"A_BINARY\" , \"0\".\"COL1\" FROM " + fullTableName; @@ -195,7 +191,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest { " CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n"; conn.createStatement().execute(ddl); final Configuration configuration = new Configuration (); - configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM); + PhoenixConfigurationUtil.setInputClusterUrl(configuration, TEST_URL); PhoenixConfigurationUtil.setInputTableName(configuration, tableName); PhoenixConfigurationUtil.setSelectColumnNames(configuration, new String[]{"A_BINARY"}); final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration); @@ -215,7 +211,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest { " (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[])\n"; conn.createStatement().execute(ddl); final Configuration configuration = new Configuration (); - configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM); + PhoenixConfigurationUtil.setInputClusterUrl(configuration, TEST_URL); PhoenixConfigurationUtil.setSelectColumnNames(configuration,new String[]{"ID","VCARRAY"}); PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY); PhoenixConfigurationUtil.setInputTableName(configuration, tableName); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionlessTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionlessTest.java index bf260ff258..01f87167da 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionlessTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionlessTest.java @@ -175,7 +175,11 @@ public class ConnectionlessTest { @Test public void testMultipleConnectionQueryServices() throws Exception { String url1 = getUrl(); - String url2 = url1 + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + "LongRunningQueries"; + // Non-ZK registries don't have heuristics to handle missing URL elements + String url2 = + url1 + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + "LongRunningQueries"; Connection conn1 = DriverManager.getConnection(url1); try { assertEquals(StringUtil.EMPTY_STRING, conn1.getMetaData().getUserName()); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java index 0719dc5a61..945f3e474a 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java @@ -28,6 +28,8 @@ import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.TestZKConnectionRegistry; +import org.apache.phoenix.jdbc.ZKConnectionInfo; import org.apache.phoenix.parse.HintNode.Hint; import org.junit.Test; @@ -122,6 +124,9 @@ public class QueryUtilTest { Properties props = new Properties(); // standard lookup. this already checks if we set hbase.zookeeper.clientPort Configuration conf = new Configuration(false); + // Need this for HBase 3 where ZK is not the default + conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, + ZKConnectionInfo.ZK_REGISTRY_NAME); conf.set(HConstants.ZOOKEEPER_QUORUM, "localhost"); conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181"); String conn = QueryUtil.getConnectionUrl(props, conf);