PHOENIX-1653 Support separate clusters for MR jobs Add support for the input and output formats of a Phoenix MapReduce job to point to separate clusters using override configuration settings. Defaults to existing behavior (HConstants.ZOOKEEPER_QUORUM)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7de8ee1e Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7de8ee1e Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7de8ee1e Branch: refs/heads/4.x-HBase-1.x Commit: 7de8ee1e914f5e0008ca9d983869757e4ca92b78 Parents: f4180fa Author: gjacoby <gjac...@salesforce.com> Authored: Fri Feb 27 16:49:14 2015 -0800 Committer: Gabriel Reid <gabri...@ngdata.com> Committed: Tue Mar 24 20:07:52 2015 +0100 ---------------------------------------------------------------------- .../phoenix/mapreduce/PhoenixInputFormat.java | 15 ++-- .../phoenix/mapreduce/PhoenixRecordWriter.java | 2 +- .../phoenix/mapreduce/index/IndexTool.java | 2 +- .../index/PhoenixIndexImportMapper.java | 2 +- .../phoenix/mapreduce/util/ConnectionUtil.java | 88 ++++++++++++++------ .../util/PhoenixConfigurationUtil.java | 72 ++++++++++++++-- .../mapreduce/util/PhoenixMapReduceUtil.java | 22 ++++- .../util/PhoenixConfigurationUtilTest.java | 60 ++++++++++++- .../pig/util/QuerySchemaParserFunction.java | 2 +- .../pig/util/SqlQueryToColumnInfoFunction.java | 2 +- 10 files changed, 219 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/7de8ee1e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java index a83b9ae..31759b4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java @@ -98,15 +98,16 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr * @throws IOException * @throws SQLException */ - private QueryPlan getQueryPlan(final JobContext context,final Configuration configuration) throws IOException { + private QueryPlan getQueryPlan(final JobContext context, final Configuration configuration) + throws IOException { Preconditions.checkNotNull(context); - try{ + try { final String currentScnValue = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE); final Properties overridingProps = new Properties(); if(currentScnValue != null) { overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue); } - final Connection connection = ConnectionUtil.getConnection(configuration,overridingProps); + final Connection connection = ConnectionUtil.getInputConnection(configuration, overridingProps); final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration); Preconditions.checkNotNull(selectStatement); final Statement statement = connection.createStatement(); @@ -116,9 +117,11 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr // Initialize the query plan so it sets up the parallel scans queryPlan.iterator(); return queryPlan; - } catch(Exception exception) { - LOG.error(String.format("Failed to get the query plan with error [%s]",exception.getMessage())); + } catch (Exception exception) { + LOG.error(String.format("Failed to get the query plan with error [%s]", + exception.getMessage())); throw new RuntimeException(exception); } - } + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7de8ee1e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java index 4d26bf4..5843076 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java @@ -46,7 +46,7 @@ public class PhoenixRecordWriter<T extends DBWritable> extends RecordWriter<Nul private long numRecords = 0; public PhoenixRecordWriter(final Configuration configuration) throws SQLException { - this.conn = ConnectionUtil.getConnection(configuration); + this.conn = ConnectionUtil.getOutputConnection(configuration); this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration); final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration); this.statement = this.conn.prepareStatement(upsertQuery); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7de8ee1e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index d93ef9c..300f575 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -163,7 +163,7 @@ public class IndexTool extends Configured implements Tool { final String qDataTable = SchemaUtil.getTableName(schemaName, dataTable); final String qIndexTable = SchemaUtil.getTableName(schemaName, indexTable); - connection = ConnectionUtil.getConnection(configuration); + connection = ConnectionUtil.getInputConnection(configuration); if(!isValidIndexTable(connection, dataTable, indexTable)) { throw new IllegalArgumentException(String.format(" %s is not an index table for %s ",qIndexTable,qDataTable)); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7de8ee1e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java index 7bf4bfc..30f6dc0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java @@ -73,7 +73,7 @@ public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexD indexTableName = PhoenixConfigurationUtil.getOutputTableName(configuration); final Properties overrideProps = new Properties (); overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE)); - connection = ConnectionUtil.getConnection(configuration,overrideProps); + connection = ConnectionUtil.getOutputConnection(configuration,overrideProps); connection.setAutoCommit(false); final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration); this.pStatement = connection.prepareStatement(upsertQuery); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7de8ee1e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java index 3234967..e677104 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java @@ -24,49 +24,89 @@ import java.util.Iterator; import java.util.Map; import java.util.Properties; +import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; import org.apache.phoenix.util.QueryUtil; -import com.google.common.base.Preconditions; - /** * Utility class to return a {@link Connection} . */ public class ConnectionUtil { + + + /** + * Retrieve the configured input Connection. + * + * @param conf configuration containing connection information + * @return the configured input connection + */ + public static Connection getInputConnection(final Configuration conf) throws SQLException { + return getInputConnection(conf, new Properties()); + } /** - * Returns the {@link Connection} from Configuration - * @param configuration - * @return - * @throws SQLException + * Retrieve the configured input Connection. + * + * @param conf configuration containing connection information + * @param props custom connection properties + * @return the configured input connection + */ + public static Connection getInputConnection(final Configuration conf , final Properties props) throws SQLException { + Preconditions.checkNotNull(conf); + return getConnection(PhoenixConfigurationUtil.getInputCluster(conf), + extractProperties(props, conf)); + } + + /** + * Create the configured output Connection. + * + * @param conf configuration containing the connection information + * @return the configured output connection */ - public static Connection getConnection(final Configuration configuration) throws SQLException { - return getConnection(configuration, null); + public static Connection getOutputConnection(final Configuration conf) throws SQLException { + return getOutputConnection(conf, new Properties()); } /** - * Used primarily in cases where we need to pass few additional/overriding properties - * @param configuration - * @param properties - * @return - * @throws SQLException + * Create the configured output Connection. + * + * @param conf configuration containing the connection information + * @param props custom connection properties + * @return the configured output connection + */ + public static Connection getOutputConnection(final Configuration conf, Properties props) throws SQLException { + Preconditions.checkNotNull(conf); + return getConnection(PhoenixConfigurationUtil.getOutputCluster(conf), + extractProperties(props, conf)); + } + + /** + * Returns the {@link Connection} from a ZooKeeper cluster string. + * + * @param quorum a ZooKeeper quorum connection string + * @return a Phoenix connection to the given connection string */ - public static Connection getConnection(final Configuration configuration , final Properties properties) throws SQLException { - Preconditions.checkNotNull(configuration); - final Properties props = new Properties(); - Iterator<Map.Entry<String, String>> iterator = configuration.iterator(); + private static Connection getConnection(final String quorum, Properties props) throws SQLException { + Preconditions.checkNotNull(quorum); + return DriverManager.getConnection(QueryUtil.getUrl(quorum), props); + } + + /** + * Add properties from the given Configuration to the provided Properties. + * + * @param props properties to which connection information from the Configuration will be added + * @param conf configuration containing connection information + * @return the input Properties value, with additional connection information from the + * given Configuration + */ + private static Properties extractProperties(Properties props, final Configuration conf) { + Iterator<Map.Entry<String, String>> iterator = conf.iterator(); if(iterator != null) { while (iterator.hasNext()) { Map.Entry<String, String> entry = iterator.next(); props.setProperty(entry.getKey(), entry.getValue()); } } - if(properties != null && !properties.isEmpty()) { - props.putAll(properties); - } - final Connection conn = DriverManager.getConnection(QueryUtil.getUrl(configuration.get(HConstants.ZOOKEEPER_QUORUM)), props); - return conn; + return props; } - } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7de8ee1e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java index b8b64b2..6e0e5e4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java @@ -17,18 +17,21 @@ */ package org.apache.phoenix.mapreduce.util; -import static org.apache.commons.lang.StringUtils.isNotEmpty; - import java.io.IOException; import java.sql.Connection; import java.sql.SQLException; import java.util.List; import java.util.Map; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable; @@ -42,10 +45,7 @@ import org.apache.phoenix.util.ColumnInfo; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.collect.Lists; +import static org.apache.commons.lang.StringUtils.isNotEmpty; /** * A utility class to set properties on the {#link Configuration} instance. @@ -90,7 +90,11 @@ public final class PhoenixConfigurationUtil { /** Configuration key for the class name of an ImportPreUpsertKeyValueProcessor */ public static final String UPSERT_HOOK_CLASS_CONFKEY = "phoenix.mapreduce.import.kvprocessor"; + + public static final String MAPREDUCE_INPUT_CLUSTER_QUORUM = "phoenix.mapreduce.input.cluster.quorum"; + public static final String MAPREDUCE_OUTPUT_CLUSTER_QUORUM = "phoneix.mapreduce.output.cluster.quorum"; + public enum SchemaType { TABLE, QUERY; @@ -165,6 +169,28 @@ public final class PhoenixConfigurationUtil { configuration.setLong(UPSERT_BATCH_SIZE, batchSize); } + /** + * Sets which HBase cluster a Phoenix MapReduce job should read from + * @param configuration + * @param quorum ZooKeeper quorum string for HBase cluster the MapReduce job will read from + */ + public static void setInputCluster(final Configuration configuration, + final String quorum) { + Preconditions.checkNotNull(configuration); + configuration.set(MAPREDUCE_INPUT_CLUSTER_QUORUM, quorum); + } + + /** + * Sets which HBase cluster a Phoenix MapReduce job should write to + * @param configuration + * @param quorum ZooKeeper quorum string for HBase cluster the MapReduce job will write to + */ + public static void setOutputCluster(final Configuration configuration, + final String quorum) { + Preconditions.checkNotNull(configuration); + configuration.set(MAPREDUCE_OUTPUT_CLUSTER_QUORUM, quorum); + } + public static Class<?> getInputClass(final Configuration configuration) { return configuration.getClass(INPUT_CLASS, NullDBWritable.class); } @@ -182,7 +208,7 @@ public final class PhoenixConfigurationUtil { if(isNotEmpty(columnInfoStr)) { return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr); } - final Connection connection = ConnectionUtil.getConnection(configuration); + final Connection connection = ConnectionUtil.getOutputConnection(configuration); String upsertColumns = configuration.get(UPSERT_COLUMNS); List<String> upsertColumnList = null; if(isNotEmpty(upsertColumns)) { @@ -232,7 +258,7 @@ public final class PhoenixConfigurationUtil { } final String tableName = getInputTableName(configuration); Preconditions.checkNotNull(tableName); - final Connection connection = ConnectionUtil.getConnection(configuration); + final Connection connection = ConnectionUtil.getInputConnection(configuration); final List<String> selectColumnList = getSelectColumnList(configuration); final List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList); final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList); @@ -276,7 +302,7 @@ public final class PhoenixConfigurationUtil { Preconditions.checkNotNull(configuration); long batchSize = configuration.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE); if(batchSize <= 0) { - Connection conn = ConnectionUtil.getConnection(configuration); + Connection conn = ConnectionUtil.getOutputConnection(configuration); batchSize = ((PhoenixConnection) conn).getMutateBatchSize(); conn.close(); } @@ -309,6 +335,34 @@ public final class PhoenixConfigurationUtil { Preconditions.checkNotNull(configuration); return configuration.get(OUTPUT_TABLE_NAME); } + + /** + * Returns the ZooKeeper quorum string for the HBase cluster a Phoenix MapReduce job will read from + * @param configuration + * @return ZooKeeper quorum string + */ + public static String getInputCluster(final Configuration configuration) { + Preconditions.checkNotNull(configuration); + String quorum = configuration.get(MAPREDUCE_INPUT_CLUSTER_QUORUM); + if (quorum == null) { + quorum = configuration.get(HConstants.ZOOKEEPER_QUORUM); + } + return quorum; + } + + /** + * Returns the ZooKeeper quorum string for the HBase cluster a Phoenix MapReduce job will write to + * @param configuration + * @return ZooKeeper quorum string + */ + public static String getOutputCluster(final Configuration configuration) { + Preconditions.checkNotNull(configuration); + String quorum = configuration.get(MAPREDUCE_OUTPUT_CLUSTER_QUORUM); + if (quorum == null) { + quorum = configuration.get(HConstants.ZOOKEEPER_QUORUM); + } + return quorum; + } public static void loadHBaseConfiguration(Job job) throws IOException { // load hbase-site.xml http://git-wip-us.apache.org/repos/asf/phoenix/blob/7de8ee1e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java index f1a7f5a..74d39bd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java @@ -49,7 +49,7 @@ public final class PhoenixMapReduceUtil { PhoenixConfigurationUtil.setInputClass(configuration,inputClass); PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.TABLE); } - + /** * * @param job @@ -64,9 +64,19 @@ public final class PhoenixMapReduceUtil { PhoenixConfigurationUtil.setInputQuery(configuration, inputQuery); PhoenixConfigurationUtil.setInputClass(configuration,inputClass); PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY); + } /** + * A method to override which HBase cluster for {@link PhoenixInputFormat} to read from + * @param job MapReduce Job + * @param quorum an HBase cluster's ZooKeeper quorum + */ + public static void setInputCluster(final Job job, final String quorum) { + final Configuration configuration = job.getConfiguration(); + PhoenixConfigurationUtil.setInputCluster(configuration, quorum); + } + /** * * @param job * @param outputClass @@ -94,6 +104,16 @@ public final class PhoenixMapReduceUtil { PhoenixConfigurationUtil.setOutputTableName(configuration, tableName); PhoenixConfigurationUtil.setUpsertColumnNames(configuration,fieldNames); } + + /** + * A method to override which HBase cluster for {@link PhoenixOutputFormat} to write to + * @param job MapReduce Job + * @param quorum an HBase cluster's ZooKeeper quorum + */ + public static void setOutputCluster(final Job job, final String quorum) { + final Configuration configuration = job.getConfiguration(); + PhoenixConfigurationUtil.setOutputCluster(configuration, quorum); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7de8ee1e/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java index 33c7531..f8f2a63 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java @@ -23,13 +23,12 @@ import static org.junit.Assert.assertEquals; import java.sql.Connection; import java.sql.DriverManager; -import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.mapreduce.Job; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType; import org.apache.phoenix.query.BaseConnectionlessQueryTest; -import org.apache.phoenix.util.ColumnInfo; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; @@ -39,7 +38,8 @@ import org.junit.Test; * Test for {@link PhoenixConfigurationUtil} */ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest { - + private static final String ORIGINAL_CLUSTER_QUORUM = "myzookeeperhost"; + private static final String OVERRIDE_CLUSTER_QUORUM = "myoverridezookeeperhost"; @Test public void testUpsertStatement() throws Exception { Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES)); @@ -121,4 +121,58 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest { conn.close(); } } + + @Test + public void testInputClusterOverride() throws Exception { + final Configuration configuration = new Configuration(); + configuration.set(HConstants.ZOOKEEPER_QUORUM, ORIGINAL_CLUSTER_QUORUM); + String zkQuorum = PhoenixConfigurationUtil.getInputCluster(configuration); + assertEquals(zkQuorum, ORIGINAL_CLUSTER_QUORUM); + + configuration.set(PhoenixConfigurationUtil.MAPREDUCE_INPUT_CLUSTER_QUORUM, + OVERRIDE_CLUSTER_QUORUM); + String zkQuorumOverride = PhoenixConfigurationUtil.getInputCluster(configuration); + assertEquals(zkQuorumOverride, OVERRIDE_CLUSTER_QUORUM); + + final Configuration configuration2 = new Configuration(); + PhoenixConfigurationUtil.setInputCluster(configuration2, OVERRIDE_CLUSTER_QUORUM); + String zkQuorumOverride2 = + PhoenixConfigurationUtil.getInputCluster(configuration2); + assertEquals(zkQuorumOverride2, OVERRIDE_CLUSTER_QUORUM); + + final Job job = Job.getInstance(); + PhoenixMapReduceUtil.setInputCluster(job, OVERRIDE_CLUSTER_QUORUM); + Configuration configuration3 = job.getConfiguration(); + String zkQuorumOverride3 = + PhoenixConfigurationUtil.getInputCluster(configuration3); + assertEquals(zkQuorumOverride3, OVERRIDE_CLUSTER_QUORUM); + + } + + @Test + public void testOutputClusterOverride() throws Exception { + final Configuration configuration = new Configuration(); + configuration.set(HConstants.ZOOKEEPER_QUORUM, ORIGINAL_CLUSTER_QUORUM); + String zkQuorum = PhoenixConfigurationUtil.getOutputCluster(configuration); + assertEquals(zkQuorum, ORIGINAL_CLUSTER_QUORUM); + + configuration.set(PhoenixConfigurationUtil.MAPREDUCE_OUTPUT_CLUSTER_QUORUM, + OVERRIDE_CLUSTER_QUORUM); + String zkQuorumOverride = PhoenixConfigurationUtil.getOutputCluster(configuration); + assertEquals(zkQuorumOverride, OVERRIDE_CLUSTER_QUORUM); + + final Configuration configuration2 = new Configuration(); + PhoenixConfigurationUtil.setOutputCluster(configuration2, OVERRIDE_CLUSTER_QUORUM); + String zkQuorumOverride2 = + PhoenixConfigurationUtil.getOutputCluster(configuration2); + assertEquals(zkQuorumOverride2, OVERRIDE_CLUSTER_QUORUM); + + final Job job = Job.getInstance(); + PhoenixMapReduceUtil.setOutputCluster(job, OVERRIDE_CLUSTER_QUORUM); + Configuration configuration3 = job.getConfiguration(); + String zkQuorumOverride3 = + PhoenixConfigurationUtil.getOutputCluster(configuration3); + assertEquals(zkQuorumOverride3, OVERRIDE_CLUSTER_QUORUM); + + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7de8ee1e/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java index f0148a6..4f43811 100644 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java +++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java @@ -59,7 +59,7 @@ public class QuerySchemaParserFunction implements Function<String,Pair<String,St Preconditions.checkArgument(!selectStatement.isEmpty(), "Select Query is empty!!"); Connection connection = null; try { - connection = ConnectionUtil.getConnection(this.configuration); + connection = ConnectionUtil.getInputConnection(this.configuration); final Statement statement = connection.createStatement(); final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class); final QueryPlan queryPlan = pstmt.compileQuery(selectStatement); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7de8ee1e/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java index 3ed35bb..2ea2c06 100644 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java +++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java @@ -52,7 +52,7 @@ public final class SqlQueryToColumnInfoFunction implements Function<String,List< Connection connection = null; List<ColumnInfo> columnInfos = null; try { - connection = ConnectionUtil.getConnection(this.configuration); + connection = ConnectionUtil.getInputConnection(this.configuration); final Statement statement = connection.createStatement(); final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class); final QueryPlan queryPlan = pstmt.compileQuery(sqlQuery);