Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 cf4e33b33 -> 04df7bca0
Added ability to run async indexes when hbase cluster is in non-distributed mode or when mr is in local mode Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/04df7bca Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/04df7bca Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/04df7bca Branch: refs/heads/4.x-HBase-0.98 Commit: 04df7bca07607f9db7f7bba5f726bff6013b81e0 Parents: cf4e33b Author: tejamobref <t...@mobref.io> Authored: Tue Jul 12 01:37:12 2016 +0530 Committer: Thomas D'Silva <tdsi...@salesforce.com> Committed: Mon Jul 11 15:15:09 2016 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/index/AsyncIndexIT.java | 180 +++++++++++++++++++ .../coprocessor/MetaDataRegionObserver.java | 113 ++++++++++-- .../phoenix/jdbc/PhoenixEmbeddedDriver.java | 4 +- .../index/automation/PhoenixMRJobSubmitter.java | 13 +- .../apache/phoenix/query/QueryConstants.java | 18 +- .../org/apache/phoenix/query/QueryServices.java | 4 +- 6 files changed, 308 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/04df7bca/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/AsyncIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/AsyncIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/AsyncIndexIT.java new file mode 100644 index 0000000..43d1bd9 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/AsyncIndexIT.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ASYNC_CREATED_DATE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; + +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.types.PDate; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.After; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(NeedsOwnMiniClusterTest.class) +public class AsyncIndexIT extends BaseTest { + + private static final String PERSON_TABLE_NAME = "PERSON"; + private static final String PERSON_TABLE_NAME_WITH_SCHEMA = "TEST.PERSON"; + private static final String TEST_SCHEMA = "TEST"; + + private static final String PERSON_TABLE_ASYNC_INDEX_INFO_QUERY = "SELECT " + + DATA_TABLE_NAME + ", " + TABLE_SCHEM + ", " + + TABLE_NAME + " FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + + SYSTEM_CATALOG_TABLE + "\"" + + " (" + ASYNC_CREATED_DATE + " " + + PDate.INSTANCE.getSqlTypeName() + ") " + " WHERE " + + COLUMN_NAME + " IS NULL and " + COLUMN_FAMILY + " IS NULL and " + + ASYNC_CREATED_DATE + " IS NOT NULL and " + + TABLE_TYPE + " = '" + PTableType.INDEX.getSerializedValue() + + "' and DATA_TABLE_NAME='" + PERSON_TABLE_NAME + + "' and TABLE_SCHEM='" + TEST_SCHEMA + "' and " + + PhoenixDatabaseMetaData.INDEX_STATE + " = '" + + PIndexState.BUILDING.getSerializedValue() + "'"; + + private void dropTable(Statement stmt) throws SQLException, IOException { + stmt.execute("DROP TABLE IF EXISTS " + PERSON_TABLE_NAME_WITH_SCHEMA); + } + + private void createTableAndLoadData(Statement stmt) throws SQLException { + String ddl = "CREATE TABLE " + PERSON_TABLE_NAME_WITH_SCHEMA + " (ID INTEGER NOT NULL PRIMARY KEY, " + + "FNAME VARCHAR, LNAME VARCHAR)"; + + stmt.execute(ddl); + stmt.execute("UPSERT INTO " + PERSON_TABLE_NAME_WITH_SCHEMA + " values(1, 'FIRST', 'F')"); + stmt.execute("UPSERT INTO " + PERSON_TABLE_NAME_WITH_SCHEMA + " values(2, 'SECOND', 'S')"); + } + + private void createAsyncIndex(Statement stmt) throws SQLException { + stmt.execute("CREATE INDEX FNAME_INDEX ON " + PERSON_TABLE_NAME_WITH_SCHEMA + "(FNAME) ASYNC"); + } + + private void dropAsyncIndex(Statement stmt) throws SQLException { + stmt.execute("DROP INDEX IF EXISTS FNAME_INDEX ON " + PERSON_TABLE_NAME_WITH_SCHEMA); + } + + @After + public void tearDown() throws Exception { + tearDownMiniCluster(); + } + + private void retryWithSleep(int maxRetries, int sleepInSecs, Statement stmt) throws Exception { + ResultSet rs = stmt.executeQuery(PERSON_TABLE_ASYNC_INDEX_INFO_QUERY); + // Wait for max of 5 retries with each retry of 5 sec sleep + int retries = 0; + while(retries <= maxRetries) { + Thread.sleep(sleepInSecs * 1000); + rs = stmt.executeQuery(PERSON_TABLE_ASYNC_INDEX_INFO_QUERY); + if (!rs.next()) { + break; + } + retries++; + } + } + + @Test + public void testAsyncIndexBuilderNonDistributed() throws Exception { + setUpTestDriver(ReadOnlyProps.EMPTY_PROPS); + Connection conn = DriverManager.getConnection(getUrl()); + Statement stmt = conn.createStatement(); + createTableAndLoadData(stmt); + createAsyncIndex(stmt); + + ResultSet rs = stmt.executeQuery(PERSON_TABLE_ASYNC_INDEX_INFO_QUERY); + assertTrue(rs.next()); + + retryWithSleep(5, 5, stmt); + + rs = stmt.executeQuery(PERSON_TABLE_ASYNC_INDEX_INFO_QUERY); + assertFalse(rs.next()); + + dropAsyncIndex(stmt); + dropTable(stmt); + } + + @Test + public void testAsyncIndexBuilderNonDistributedMapreduceYarn() throws Exception { + Map<String,String> props = new HashMap<>(); + props.put(QueryServices.MAPRED_FRAMEWORK_NAME, "yarn"); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + + Connection conn = DriverManager.getConnection(getUrl()); + Statement stmt = conn.createStatement(); + createTableAndLoadData(stmt); + createAsyncIndex(stmt); + + ResultSet rs = stmt.executeQuery(PERSON_TABLE_ASYNC_INDEX_INFO_QUERY); + assertTrue(rs.next()); + + retryWithSleep(5, 5, stmt); + + rs = stmt.executeQuery(PERSON_TABLE_ASYNC_INDEX_INFO_QUERY); + assertFalse(rs.next()); + + dropAsyncIndex(stmt); + dropTable(stmt); + } + + @Test + public void testAsyncIndexBuilderDistributed() throws Exception { + Map<String,String> props = new HashMap<>(); + props.put(QueryServices.HBASE_CLUSTER_DISTRIBUTED_ATTRIB, "true"); + props.put(QueryServices.MAPRED_FRAMEWORK_NAME, "yarn"); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + + Connection conn = DriverManager.getConnection(getUrl()); + Statement stmt = conn.createStatement(); + createTableAndLoadData(stmt); + createAsyncIndex(stmt); + + ResultSet rs = stmt.executeQuery(PERSON_TABLE_ASYNC_INDEX_INFO_QUERY); + assertTrue(rs.next()); + + retryWithSleep(5, 5, stmt); + + rs = stmt.executeQuery(PERSON_TABLE_ASYNC_INDEX_INFO_QUERY); + assertTrue(rs.next()); + + dropAsyncIndex(stmt); + dropTable(stmt); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/04df7bca/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index 8d7d8e5..bf396a9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -17,8 +17,12 @@ */ package org.apache.phoenix.coprocessor; +import static org.apache.phoenix.query.QueryConstants.ASYNC_INDEX_INFO_QUERY; + import java.io.IOException; +import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -30,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; @@ -87,10 +92,12 @@ import com.google.common.collect.Lists; */ public class MetaDataRegionObserver extends BaseRegionObserver { public static final Log LOG = LogFactory.getLog(MetaDataRegionObserver.class); - protected ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + protected ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2); private boolean enableRebuildIndex = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD; private long rebuildIndexTimeInterval = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL; private boolean blockWriteRebuildIndex = false; + private final String HBASE_CLUSTER_DISTRIBUTED_CONFIG = "true"; + private final String MAPRED_FRAMEWORK_YARN_CONFIG = "yarn"; @Override public void preClose(final ObserverContext<RegionCoprocessorEnvironment> c, @@ -158,23 +165,109 @@ public class MetaDataRegionObserver extends BaseRegionObserver { }; (new Thread(r)).start(); - if (!enableRebuildIndex && !blockWriteRebuildIndex) { - LOG.info("Failure Index Rebuild is skipped by configuration."); - return; - } // turn off verbose deprecation logging Logger deprecationLogger = Logger.getLogger("org.apache.hadoop.conf.Configuration.deprecation"); if (deprecationLogger != null) { deprecationLogger.setLevel(Level.WARN); } + try { Class.forName(PhoenixDriver.class.getName()); - // starts index rebuild schedule work - BuildIndexScheduleTask task = new BuildIndexScheduleTask(e.getEnvironment()); - // run scheduled task every 10 secs - executor.scheduleAtFixedRate(task, 10000, rebuildIndexTimeInterval, TimeUnit.MILLISECONDS); } catch (ClassNotFoundException ex) { - LOG.error("BuildIndexScheduleTask cannot start!", ex); + LOG.error("Phoenix Driver class is not found. Fix the classpath.", ex); + } + + Configuration conf = env.getConfiguration(); + String hbaseClusterDistributedMode = conf.get(QueryServices.HBASE_CLUSTER_DISTRIBUTED_ATTRIB); + String mapredFrameworkName = conf.get(QueryServices.MAPRED_FRAMEWORK_NAME); + + // In case of non-distributed mode of hbase service or local mode of map reduce service, add timer task to rebuild the async indexes + if ((hbaseClusterDistributedMode != null && !hbaseClusterDistributedMode.equals(HBASE_CLUSTER_DISTRIBUTED_CONFIG)) || + (mapredFrameworkName != null && !mapredFrameworkName.equals(MAPRED_FRAMEWORK_YARN_CONFIG))) + { + LOG.info("Enabling Async Index rebuilder"); + AsyncIndexRebuilderTask asyncIndexRebuilderTask = new AsyncIndexRebuilderTask(e.getEnvironment()); + // run async index rebuilder task every 10 secs to rebuild any newly created async indexes + executor.scheduleAtFixedRate(asyncIndexRebuilderTask, 10000, rebuildIndexTimeInterval, TimeUnit.MILLISECONDS); + } + + if (!enableRebuildIndex && !blockWriteRebuildIndex) { + LOG.info("Failure Index Rebuild is skipped by configuration."); + return; + } + + // starts index rebuild schedule work + BuildIndexScheduleTask task = new BuildIndexScheduleTask(e.getEnvironment()); + // run scheduled task every 10 secs + executor.scheduleAtFixedRate(task, 10000, rebuildIndexTimeInterval, TimeUnit.MILLISECONDS); + } + + /** + * Task runs periodically to re-build async indexes when hbase is running in non-distributed mode or + * when mapreduce is running in local mode + * + */ + public static class AsyncIndexRebuilderTask extends TimerTask { + RegionCoprocessorEnvironment env; + PhoenixConnection conn = null; + + public AsyncIndexRebuilderTask(RegionCoprocessorEnvironment env) { + this.env = env; + } + + @Override + public void run() { + try { + if (conn == null) { + conn = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class); + } + Statement s = conn.createStatement(); + ResultSet rs = s.executeQuery(ASYNC_INDEX_INFO_QUERY); + + PhoenixConnection alterIndexConnection = null; + while (rs.next()) { + String tableName = rs.getString(PhoenixDatabaseMetaData.DATA_TABLE_NAME); + String tableSchema = rs.getString(PhoenixDatabaseMetaData.TABLE_SCHEM); + String indexName = rs.getString(PhoenixDatabaseMetaData.TABLE_NAME); + String tableNameWithSchema = SchemaUtil.getTableName(tableSchema, tableName); + + final PTable pindexTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(tableSchema, indexName)); + // this is set to ensure index tables remains consistent post population. + long maxTimeRange = pindexTable.getTimeStamp()+1; + + try { + final Properties props = new Properties(); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(maxTimeRange)); + alterIndexConnection = QueryUtil.getConnectionOnServer(props, env.getConfiguration()).unwrap(PhoenixConnection.class); + + // Alter index query for rebuilding async indexes + String alterIndexQuery = String.format("ALTER INDEX IF EXISTS %s ON %s REBUILD", indexName, tableNameWithSchema); + + LOG.info("Executing Rebuild Index Query:" + alterIndexQuery); + alterIndexConnection.createStatement().execute(alterIndexQuery); + } catch (Throwable t) { + LOG.error("AsyncIndexRebuilderTask failed during rebuilding index!", t); + } finally { + if (alterIndexConnection != null) { + try { + alterIndexConnection.close(); + } catch (SQLException ignored) { + LOG.debug("AsyncIndexRebuilderTask can't close alterIndexConnection", ignored); + } + } + } + } + } catch (Throwable t) { + LOG.error("AsyncIndexRebuilderTask failed!", t); + } finally { + if (conn != null) { + try { + conn.close(); + } catch (SQLException ignored) { + LOG.debug("AsyncIndexRebuilderTask can't close connection", ignored); + } + } + } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/04df7bca/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java index 40e03b9..d2dd94f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java @@ -290,7 +290,7 @@ public abstract class PhoenixEmbeddedDriver implements Driver, SQLCloseable { // Normalize connInfo so that a url explicitly specifying versus implicitly inheriting // the default values will both share the same ConnectionQueryServices. if (zookeeperQuorum == null) { - zookeeperQuorum = props.get(QueryServices.ZOOKEEPER_QUARUM_ATTRIB); + zookeeperQuorum = props.get(QueryServices.ZOOKEEPER_QUORUM_ATTRIB); if (zookeeperQuorum == null) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL) .setMessage(this.toString()).build().buildException(); @@ -357,7 +357,7 @@ public abstract class PhoenixEmbeddedDriver implements Driver, SQLCloseable { public ReadOnlyProps asProps() { Map<String, String> connectionProps = Maps.newHashMapWithExpectedSize(3); if (getZookeeperQuorum() != null) { - connectionProps.put(QueryServices.ZOOKEEPER_QUARUM_ATTRIB, getZookeeperQuorum()); + connectionProps.put(QueryServices.ZOOKEEPER_QUORUM_ATTRIB, getZookeeperQuorum()); } if (getPort() != null) { connectionProps.put(QueryServices.ZOOKEEPER_PORT_ATTRIB, getPort().toString()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/04df7bca/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java index 9b8d5c8..99f84ec 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java @@ -17,6 +17,8 @@ */ package org.apache.phoenix.mapreduce.index.automation; +import static org.apache.phoenix.query.QueryConstants.ASYNC_INDEX_INFO_QUERY; + import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; @@ -67,15 +69,6 @@ public class PhoenixMRJobSubmitter { "/phoenix/automated-mr-index-build-leader-election"; private static final String AUTO_INDEX_BUILD_LOCK_NAME = "ActiveStandbyElectorLock"; - private static final String CANDIDATE_INDEX_INFO_QUERY = "SELECT " - + PhoenixDatabaseMetaData.INDEX_TYPE + ", " + PhoenixDatabaseMetaData.DATA_TABLE_NAME - + ", " + PhoenixDatabaseMetaData.TABLE_SCHEM + ", " - + PhoenixDatabaseMetaData.TABLE_NAME + " FROM " - + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE " - + PhoenixDatabaseMetaData.COLUMN_NAME + " is null and " - + PhoenixDatabaseMetaData.TABLE_TYPE + " = '" + PTableType.INDEX.getSerializedValue() - + "' and " + PhoenixDatabaseMetaData.INDEX_STATE + " = '" - + PIndexState.BUILDING.getSerializedValue() + "'"; // TODO - Move this to a property? private static final int JOB_SUBMIT_POOL_TIMEOUT = 5; private Configuration conf; @@ -151,7 +144,7 @@ public class PhoenixMRJobSubmitter { public Map<String, PhoenixAsyncIndex> getCandidateJobs() throws SQLException { Connection con = DriverManager.getConnection("jdbc:phoenix:" + zkQuorum); Statement s = con.createStatement(); - ResultSet rs = s.executeQuery(CANDIDATE_INDEX_INFO_QUERY); + ResultSet rs = s.executeQuery(ASYNC_INDEX_INFO_QUERY); Map<String, PhoenixAsyncIndex> candidateIndexes = new HashMap<String, PhoenixAsyncIndex>(); while (rs.next()) { PhoenixAsyncIndex indexInfo = new PhoenixAsyncIndex(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/04df7bca/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java index 9f8f58c..8bc1c5b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -21,6 +21,7 @@ package org.apache.phoenix.query; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARG_POSITION; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ASYNC_CREATED_DATE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BASE_COLUMN_COUNT; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BUFFER_LENGTH; @@ -115,9 +116,12 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.schema.MetaDataSplitPolicy; +import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; +import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.types.PDate; import org.apache.phoenix.util.ByteUtil; @@ -167,7 +171,19 @@ public interface QueryConstants { public static final byte[] TRUE = new byte[] {1}; - + public static final String ASYNC_INDEX_INFO_QUERY = "SELECT " + + DATA_TABLE_NAME + ", " + TABLE_SCHEM + ", " + + TABLE_NAME + ", " + ASYNC_CREATED_DATE + + " FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + + SYSTEM_CATALOG_TABLE + "\"" + + " (" + ASYNC_CREATED_DATE + " " + + PDate.INSTANCE.getSqlTypeName() + ") " + " WHERE " + + COLUMN_NAME + " IS NULL and " + COLUMN_FAMILY + " IS NULL and " + + ASYNC_CREATED_DATE + " IS NOT NULL and " + + TABLE_TYPE + " = '" + PTableType.INDEX.getSerializedValue() + + "' and " + PhoenixDatabaseMetaData.INDEX_STATE + " = '" + + PIndexState.BUILDING.getSerializedValue() + "'"; + /** * Separator used between variable length keys for a composite key. * Variable length data types may not use this byte value. http://git-wip-us.apache.org/repos/asf/phoenix/blob/04df7bca/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 6df3d1e..3f3518b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -107,7 +107,7 @@ public interface QueryServices extends SQLCloseable { public static final String REGIONSERVER_LEASE_PERIOD_ATTRIB = "hbase.regionserver.lease.period"; public static final String RPC_TIMEOUT_ATTRIB = "hbase.rpc.timeout"; public static final String DYNAMIC_JARS_DIR_KEY = "hbase.dynamic.jars.dir"; - public static final String ZOOKEEPER_QUARUM_ATTRIB = "hbase.zookeeper.quorum"; + public static final String ZOOKEEPER_QUORUM_ATTRIB = "hbase.zookeeper.quorum"; public static final String ZOOKEEPER_PORT_ATTRIB = "hbase.zookeeper.property.clientPort"; public static final String ZOOKEEPER_ROOT_NODE_ATTRIB = "zookeeper.znode.parent"; public static final String DISTINCT_VALUE_COMPRESS_THRESHOLD_ATTRIB = "phoenix.distinct.value.compress.threshold"; @@ -116,6 +116,8 @@ public interface QueryServices extends SQLCloseable { public static final String MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB = "phoenix.coprocessor.maxMetaDataCacheTimeToLiveMs"; public static final String MAX_SERVER_METADATA_CACHE_SIZE_ATTRIB = "phoenix.coprocessor.maxMetaDataCacheSize"; public static final String MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB = "phoenix.client.maxMetaDataCacheSize"; + public static final String HBASE_CLUSTER_DISTRIBUTED_ATTRIB = "hbase.cluster.distributed"; + public static final String MAPRED_FRAMEWORK_NAME = "mapreduce.framework.name"; public static final String AUTO_UPGRADE_WHITELIST_ATTRIB = "phoenix.client.autoUpgradeWhiteList"; // Mainly for testing to force spilling