PHOENIX-1683 Support HBase HA Query(timeline-consistent region replica read) (Rajeshbabu Chintaguntla)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/742ca13d Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/742ca13d Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/742ca13d Branch: refs/heads/calcite Commit: 742ca13d356c13a0055bd63299940219e14827fb Parents: 3a0ce7d Author: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Authored: Fri Apr 3 14:12:25 2015 +0530 Committer: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Committed: Fri Apr 3 14:12:25 2015 +0530 ---------------------------------------------------------------------- .../apache/phoenix/end2end/AlterSessionIT.java | 92 ++++++++++++++++++++ phoenix-core/src/main/antlr3/PhoenixSQL.g | 8 ++ .../apache/phoenix/execute/BaseQueryPlan.java | 6 ++ .../apache/phoenix/iterate/ExplainTable.java | 7 +- .../apache/phoenix/jdbc/PhoenixConnection.java | 15 +++- .../apache/phoenix/jdbc/PhoenixStatement.java | 56 ++++++++++++ .../phoenix/parse/AlterSessionStatement.java | 38 ++++++++ .../apache/phoenix/parse/ParseNodeFactory.java | 4 + .../org/apache/phoenix/query/QueryServices.java | 2 + .../phoenix/query/QueryServicesOptions.java | 3 + .../java/org/apache/phoenix/util/JDBCUtil.java | 42 +++++++-- .../org/apache/phoenix/util/PhoenixRuntime.java | 5 ++ .../org/apache/phoenix/util/JDBCUtilTest.java | 15 ++++ 13 files changed, 284 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/742ca13d/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterSessionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterSessionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterSessionIT.java new file mode 100644 index 0000000..d97d6d4 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterSessionIT.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import org.apache.hadoop.hbase.client.Consistency; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Properties; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * + * Basic tests for Alter Session Statements + * + */ +public class AlterSessionIT extends BaseHBaseManagedTimeIT { + + Connection testConn; + + @Before + public void initTable() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + testConn = DriverManager.getConnection(getUrl(), props); + assertEquals(Consistency.STRONG, ((PhoenixConnection)testConn).getConsistency()); + testConn.createStatement().execute("create table AlterSessionIT (col1 varchar primary key)"); + testConn.commit(); + } + + @Test + public void testUpdateConsistency() throws Exception { + try { + Statement st = testConn.createStatement(); + st.execute("alter session set Consistency = 'timeline'"); + ResultSet rs = st.executeQuery("explain select * from AlterSessionIT"); + assertEquals(Consistency.TIMELINE, ((PhoenixConnection)testConn).getConsistency()); + String queryPlan = QueryUtil.getExplainPlan(rs); + assertTrue(queryPlan.indexOf("TIMELINE") > 0); + + // turn off timeline read consistency + st.execute("alter session set Consistency = 'strong'"); + rs = st.executeQuery("explain select * from AlterSessionIT"); + queryPlan = QueryUtil.getExplainPlan(rs); + assertTrue(queryPlan.indexOf("TIMELINE") < 0); + } finally { + this.testConn.close(); + } + } + + @Test + public void testSetConsistencyInURL() throws Exception { + try { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl() + PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR + + "Consistency=TIMELINE", props); + assertEquals(Consistency.TIMELINE, ((PhoenixConnection)conn).getConsistency()); + Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery("explain select * from AlterSessionIT"); + String queryPlan = QueryUtil.getExplainPlan(rs); + assertTrue(queryPlan.indexOf("TIMELINE") > 0); + conn.close(); + } finally { + this.testConn.close(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/742ca13d/phoenix-core/src/main/antlr3/PhoenixSQL.g ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g index 0330a39..61d5afa 100644 --- a/phoenix-core/src/main/antlr3/PhoenixSQL.g +++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g @@ -70,6 +70,7 @@ tokens KEY='key'; ALTER='alter'; COLUMN='column'; + SESSION='session'; TABLE='table'; ADD='add'; SPLIT='split'; @@ -372,6 +373,7 @@ non_select_node returns [BindableStatement ret] | s=alter_index_node | s=alter_table_node | s=trace_node + | s=alter_session_node | s=create_sequence_node | s=drop_sequence_node | s=update_statistics_node @@ -512,6 +514,12 @@ trace_node returns [TraceStatement ret] {ret = factory.trace(Tracing.isTraceOn(flag.getText()), s == null ? Tracing.isTraceOn(flag.getText()) ? 1.0 : 0.0 : (((BigDecimal)s.getValue())).doubleValue());} ; +// Parse an alter session statement. +alter_session_node returns [AlterSessionStatement ret] + : ALTER SESSION (SET p=properties) + {ret = factory.alterSession(p);} + ; + // Parse an alter table statement. alter_table_node returns [AlterTableStatement ret] : ALTER (TABLE | v=VIEW) t=from_table_name http://git-wip-us.apache.org/repos/asf/phoenix/blob/742ca13d/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index 4ca2dee..9b2d05a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -178,6 +178,12 @@ public abstract class BaseQueryPlan implements QueryPlan { // is resolved. // TODO: include time range in explain plan? PhoenixConnection connection = context.getConnection(); + + // set read consistency + if (context.getCurrentTable() != null + && context.getCurrentTable().getTable().getType() != PTableType.SYSTEM) { + scan.setConsistency(connection.getConsistency()); + } if (context.getScanTimeRange() == null) { Long scn = connection.getSCN(); if (scn == null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/742ca13d/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java index 9756871..2fcc2fb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; @@ -98,6 +99,11 @@ public abstract class ExplainTable { StringBuilder buf = new StringBuilder(prefix); ScanRanges scanRanges = context.getScanRanges(); boolean hasSkipScanFilter = false; + Scan scan = context.getScan(); + + if (scan.getConsistency() != Consistency.STRONG){ + buf.append("TIMELINE-CONSISTENCY "); + } if (hint.hasHint(Hint.SMALL)) { buf.append("SMALL "); } @@ -115,7 +121,6 @@ public abstract class ExplainTable { } planSteps.add(buf.toString()); - Scan scan = context.getScan(); Filter filter = scan.getFilter(); PageFilter pageFilter = null; if (filter != null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/742ca13d/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index 732dd8b..1277151 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -55,6 +55,7 @@ import java.util.concurrent.Executor; import javax.annotation.Nullable; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Consistency; import org.apache.phoenix.call.CallRunner; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; @@ -137,7 +138,8 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd private Sampler<?> sampler; private boolean readOnly = false; private Map<String, String> customTracingAnnotations = emptyMap(); - + private Consistency consistency = Consistency.STRONG; + static { Tracing.addTraceMetricsSource(); } @@ -205,6 +207,9 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd this.services.getProps().getBoolean( QueryServices.AUTO_COMMIT_ATTRIB, QueryServicesOptions.DEFAULT_AUTO_COMMIT)); + this.consistency = JDBCUtil.getConsistencyLevel(url, this.info, this.services.getProps() + .get(QueryServices.CONSISTENCY_ATTRIB, + QueryServicesOptions.DEFAULT_CONSISTENCY_LEVEL)); this.tenantId = tenantId; this.mutateBatchSize = JDBCUtil.getMutateBatchSize(url, this.info, this.services.getProps()); datePattern = this.services.getProps().get(QueryServices.DATE_FORMAT_ATTRIB, DateUtil.DEFAULT_DATE_FORMAT); @@ -509,6 +514,10 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd return isAutoCommit; } + public Consistency getConsistency() { + return this.consistency; + } + @Override public String getCatalog() throws SQLException { return ""; @@ -647,6 +656,10 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd this.isAutoCommit = isAutoCommit; } + public void setConsistency(Consistency val) { + this.consistency = val; + } + @Override public void setCatalog(String catalog) throws SQLException { throw new SQLFeatureNotSupportedException(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/742ca13d/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index f802ff4..ee6b016 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -36,6 +36,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.call.CallRunner; @@ -73,6 +74,7 @@ import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.parse.AddColumnStatement; import org.apache.phoenix.parse.AliasedNode; import org.apache.phoenix.parse.AlterIndexStatement; +import org.apache.phoenix.parse.AlterSessionStatement; import org.apache.phoenix.parse.BindableStatement; import org.apache.phoenix.parse.ColumnDef; import org.apache.phoenix.parse.ColumnName; @@ -127,6 +129,7 @@ import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.KeyValueUtil; import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.PhoenixContextExecutor; +import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; @@ -716,6 +719,54 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho } } + private static class ExecutableAlterSessionStatement extends AlterSessionStatement implements CompilableStatement { + + public ExecutableAlterSessionStatement(Map<String,Object> props) { + super(props); + } + + @SuppressWarnings("unchecked") + @Override + public MutationPlan compilePlan(final PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { + final StatementContext context = new StatementContext(stmt); + return new MutationPlan() { + + @Override + public StatementContext getContext() { + return context; + } + + @Override + public ParameterMetaData getParameterMetaData() { + return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA; + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + return new ExplainPlan(Collections.singletonList("ALTER SESSION")); + } + + @Override + public PhoenixConnection getConnection() { + return stmt.getConnection(); + } + + @Override + public MutationState execute() throws SQLException { + Object consistency = getProps().get(PhoenixRuntime.CONSISTENCY_ATTRIB.toUpperCase()); + if(consistency != null) { + if (((String)consistency).equalsIgnoreCase(Consistency.TIMELINE.toString())){ + getConnection().setConsistency(Consistency.TIMELINE); + } else { + getConnection().setConsistency(Consistency.STRONG); + } + } + return new MutationState(0, context.getConnection()); + } + }; + } + } + private static class ExecutableUpdateStatisticsStatement extends UpdateStatisticsStatement implements CompilableStatement { public ExecutableUpdateStatisticsStatement(NamedTableNode table, StatisticsCollectionScope scope, Map<String,Object> props) { @@ -915,6 +966,11 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho } @Override + public AlterSessionStatement alterSession(Map<String, Object> props) { + return new ExecutableAlterSessionStatement(props); + } + + @Override public ExplainStatement explain(BindableStatement statement) { return new ExecutableExplainStatement(statement); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/742ca13d/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterSessionStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterSessionStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterSessionStatement.java new file mode 100644 index 0000000..5d944df --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterSessionStatement.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.parse; + +import java.util.Map; + +public class AlterSessionStatement extends MutableStatement { + + private final Map<String,Object> props; + + public AlterSessionStatement(Map<String,Object> props) { + this.props = props; + } + + @Override + public int getBindCount() { + return 0; + } + + public Map<String, Object> getProps(){ + return props; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/742ca13d/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java index eb1768c..62db00a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java @@ -330,6 +330,10 @@ public class ParseNodeFactory { return new TraceStatement(isTraceOn, samplingRate); } + public AlterSessionStatement alterSession(Map<String,Object> props) { + return new AlterSessionStatement(props); + } + public TableName table(String schemaName, String tableName) { return TableName.createNormalized(schemaName,tableName); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/742ca13d/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 7a911e7..adf146d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -48,6 +48,8 @@ public interface QueryServices extends SQLCloseable { public static final String HBASE_CLIENT_PRINCIPAL = "hbase.myclient.principal"; public static final String SPOOL_DIRECTORY = "phoenix.spool.directory"; public static final String AUTO_COMMIT_ATTRIB = "phoenix.connection.autoCommit"; + // consistency configuration setting + public static final String CONSISTENCY_ATTRIB = "phoenix.connection.consistency"; /** * max size to spool the the result into http://git-wip-us.apache.org/repos/asf/phoenix/blob/742ca13d/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 3561663..884b820 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -67,6 +67,7 @@ import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory; @@ -191,6 +192,8 @@ public class QueryServicesOptions { private static final String DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY = ClientRpcControllerFactory.class.getName(); + public static final String DEFAULT_CONSISTENCY_LEVEL = Consistency.STRONG.toString(); + private final Configuration config; private QueryServicesOptions(Configuration config) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/742ca13d/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java index 06534d1..ddd9753 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java @@ -26,6 +26,7 @@ import java.util.Properties; import javax.annotation.Nullable; +import org.apache.hadoop.hbase.client.Consistency; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PName; @@ -54,12 +55,15 @@ public class JDBCUtil { * @return the property value or null if not found */ public static String findProperty(String url, Properties info, String propName) { - String urlPropName = ";" + propName + "="; + String urlPropName = PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR + propName.toUpperCase() + "="; + String upperCaseURL = url.toUpperCase(); String propValue = info.getProperty(propName); if (propValue == null) { - int begIndex = url.indexOf(urlPropName); + int begIndex = upperCaseURL.indexOf(urlPropName); if (begIndex >= 0) { - int endIndex = url.indexOf(';',begIndex + urlPropName.length()); + int endIndex = + upperCaseURL.indexOf(PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR, begIndex + + urlPropName.length()); if (endIndex < 0) { endIndex = url.length(); } @@ -70,10 +74,13 @@ public class JDBCUtil { } public static String removeProperty(String url, String propName) { - String urlPropName = ";" + propName + "="; - int begIndex = url.indexOf(urlPropName); + String urlPropName = PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR + propName.toUpperCase() + "="; + String upperCaseURL = url.toUpperCase(); + int begIndex = upperCaseURL.indexOf(urlPropName); if (begIndex >= 0) { - int endIndex = url.indexOf(';', begIndex + urlPropName.length()); + int endIndex = + upperCaseURL.indexOf(PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR, begIndex + + urlPropName.length()); if (endIndex < 0) { endIndex = url.length(); } @@ -93,7 +100,7 @@ public class JDBCUtil { for (String propName : info.stringPropertyNames()) { result.put(propName, info.getProperty(propName)); } - String[] urlPropNameValues = url.split(";"); + String[] urlPropNameValues = url.split(Character.toString(PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR)); if (urlPropNameValues.length > 1) { for (int i = 1; i < urlPropNameValues.length; i++) { String[] urlPropNameValue = urlPropNameValues[i].split("="); @@ -154,4 +161,25 @@ public class JDBCUtil { } return Boolean.valueOf(autoCommit); } + + /** + * Retrieve the value of the optional consistency read setting from JDBC url or connection + * properties. + * + * @param url JDBC url used for connecting to Phoenix + * @param info connection properties + * @param defaultValue default to return if ReadConsistency property is not set in the url + * or connection properties + * @return the boolean value supplied for the AutoCommit in the connection URL or properties, + * or the supplied default value if no AutoCommit attribute was provided + */ + public static Consistency getConsistencyLevel(String url, Properties info, String defaultValue) { + String consistency = findProperty(url, info, PhoenixRuntime.CONSISTENCY_ATTRIB); + + if(consistency != null && consistency.equalsIgnoreCase(Consistency.TIMELINE.toString())){ + return Consistency.TIMELINE; + } + + return Consistency.STRONG; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/742ca13d/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java index b030510..e5ead10 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java @@ -142,6 +142,11 @@ public class PhoenixRuntime { public static final String AUTO_COMMIT_ATTRIB = "AutoCommit"; /** + * Use this connection property to explicitly set read consistency level on a new connection. + */ + public static final String CONSISTENCY_ATTRIB = "Consistency"; + + /** * Use this as the zookeeper quorum name to have a connection-less connection. This enables * Phoenix-compatible HFiles to be created in a map/reduce job by creating tables, * upserting data into them, and getting the uncommitted state through {@link #getUncommittedData(Connection)} http://git-wip-us.apache.org/repos/asf/phoenix/blob/742ca13d/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java index 74b397f..fc29ad6 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue; import java.util.Map; import java.util.Properties; +import org.apache.hadoop.hbase.client.Consistency; import org.junit.Test; public class JDBCUtilTest { @@ -101,4 +102,18 @@ public class JDBCUtilTest { props.setProperty("AutoCommit", "false"); assertFalse(JDBCUtil.getAutoCommit("localhost", props, false)); } + + @Test + public void testGetConsistency_TIMELINE_InUrl() { + assertTrue(JDBCUtil.getConsistencyLevel("localhost;Consistency=TIMELINE", new Properties(), + Consistency.STRONG.toString()) == Consistency.TIMELINE); + } + + @Test + public void testGetConsistency_TIMELINE_InProperties() { + Properties props = new Properties(); + props.setProperty(PhoenixRuntime.CONSISTENCY_ATTRIB, "TIMELINE"); + assertTrue(JDBCUtil.getConsistencyLevel("localhost", props, Consistency.STRONG.toString()) + == Consistency.TIMELINE); + } }