Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 cf4e33b33 -> 04df7bca0


Added ability to run async indexes when hbase cluster is in non-distributed 
mode or when mr is in local mode


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/04df7bca
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/04df7bca
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/04df7bca

Branch: refs/heads/4.x-HBase-0.98
Commit: 04df7bca07607f9db7f7bba5f726bff6013b81e0
Parents: cf4e33b
Author: tejamobref <t...@mobref.io>
Authored: Tue Jul 12 01:37:12 2016 +0530
Committer: Thomas D'Silva <tdsi...@salesforce.com>
Committed: Mon Jul 11 15:15:09 2016 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/index/AsyncIndexIT.java     | 180 +++++++++++++++++++
 .../coprocessor/MetaDataRegionObserver.java     | 113 ++++++++++--
 .../phoenix/jdbc/PhoenixEmbeddedDriver.java     |   4 +-
 .../index/automation/PhoenixMRJobSubmitter.java |  13 +-
 .../apache/phoenix/query/QueryConstants.java    |  18 +-
 .../org/apache/phoenix/query/QueryServices.java |   4 +-
 6 files changed, 308 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/04df7bca/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/AsyncIndexIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/AsyncIndexIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/AsyncIndexIT.java
new file mode 100644
index 0000000..43d1bd9
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/AsyncIndexIT.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ASYNC_CREATED_DATE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.types.PDate;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class AsyncIndexIT extends BaseTest {
+
+    private static final String PERSON_TABLE_NAME = "PERSON";
+    private static final String PERSON_TABLE_NAME_WITH_SCHEMA = "TEST.PERSON";
+    private static final String TEST_SCHEMA = "TEST";
+
+    private static final String PERSON_TABLE_ASYNC_INDEX_INFO_QUERY = "SELECT "
+            + DATA_TABLE_NAME + ", " + TABLE_SCHEM + ", "
+            + TABLE_NAME + " FROM " + SYSTEM_CATALOG_SCHEMA + ".\""
+            + SYSTEM_CATALOG_TABLE + "\""
+            + " (" + ASYNC_CREATED_DATE + " "
+            + PDate.INSTANCE.getSqlTypeName() + ") " + " WHERE "
+            + COLUMN_NAME + " IS NULL and " + COLUMN_FAMILY + " IS NULL  and "
+            + ASYNC_CREATED_DATE + " IS NOT NULL and "
+            + TABLE_TYPE + " = '" + PTableType.INDEX.getSerializedValue()
+            + "' and DATA_TABLE_NAME='" + PERSON_TABLE_NAME 
+            + "' and TABLE_SCHEM='" + TEST_SCHEMA + "' and "
+            + PhoenixDatabaseMetaData.INDEX_STATE + " = '" 
+            + PIndexState.BUILDING.getSerializedValue() + "'";
+
+    private void dropTable(Statement stmt) throws SQLException, IOException {
+        stmt.execute("DROP TABLE IF EXISTS " + PERSON_TABLE_NAME_WITH_SCHEMA);
+    }
+
+    private void createTableAndLoadData(Statement stmt) throws SQLException {
+        String ddl = "CREATE TABLE " + PERSON_TABLE_NAME_WITH_SCHEMA + " (ID 
INTEGER NOT NULL PRIMARY KEY, " +
+                     "FNAME VARCHAR, LNAME VARCHAR)";
+        
+        stmt.execute(ddl);
+        stmt.execute("UPSERT INTO " + PERSON_TABLE_NAME_WITH_SCHEMA + " 
values(1, 'FIRST', 'F')");
+        stmt.execute("UPSERT INTO " + PERSON_TABLE_NAME_WITH_SCHEMA + " 
values(2, 'SECOND', 'S')");
+    }
+
+    private void createAsyncIndex(Statement stmt) throws SQLException {
+        stmt.execute("CREATE INDEX FNAME_INDEX ON " + 
PERSON_TABLE_NAME_WITH_SCHEMA + "(FNAME) ASYNC");
+    }
+    
+    private void dropAsyncIndex(Statement stmt) throws SQLException {
+        stmt.execute("DROP INDEX IF EXISTS FNAME_INDEX ON " + 
PERSON_TABLE_NAME_WITH_SCHEMA);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        tearDownMiniCluster();
+    }
+
+    private void retryWithSleep(int maxRetries, int sleepInSecs, Statement 
stmt) throws Exception {
+        ResultSet rs = stmt.executeQuery(PERSON_TABLE_ASYNC_INDEX_INFO_QUERY);
+        // Wait for max of 5 retries with each retry of 5 sec sleep
+        int retries = 0;
+        while(retries <= maxRetries) {
+            Thread.sleep(sleepInSecs * 1000);
+            rs = stmt.executeQuery(PERSON_TABLE_ASYNC_INDEX_INFO_QUERY);
+            if (!rs.next()) {
+                break;
+            }
+            retries++;
+        }
+    }
+    
+    @Test
+    public void testAsyncIndexBuilderNonDistributed() throws Exception {
+        setUpTestDriver(ReadOnlyProps.EMPTY_PROPS);
+        Connection conn = DriverManager.getConnection(getUrl());
+        Statement stmt = conn.createStatement();
+        createTableAndLoadData(stmt);
+        createAsyncIndex(stmt);
+
+        ResultSet rs = stmt.executeQuery(PERSON_TABLE_ASYNC_INDEX_INFO_QUERY);
+        assertTrue(rs.next());
+
+        retryWithSleep(5, 5, stmt);
+
+        rs = stmt.executeQuery(PERSON_TABLE_ASYNC_INDEX_INFO_QUERY);
+        assertFalse(rs.next());
+
+        dropAsyncIndex(stmt);
+        dropTable(stmt);
+    }
+    
+    @Test
+    public void testAsyncIndexBuilderNonDistributedMapreduceYarn() throws 
Exception {
+        Map<String,String> props = new HashMap<>();
+        props.put(QueryServices.MAPRED_FRAMEWORK_NAME, "yarn");
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+        
+        Connection conn = DriverManager.getConnection(getUrl());
+        Statement stmt = conn.createStatement();
+        createTableAndLoadData(stmt);
+        createAsyncIndex(stmt);
+
+        ResultSet rs = stmt.executeQuery(PERSON_TABLE_ASYNC_INDEX_INFO_QUERY);
+        assertTrue(rs.next());
+
+        retryWithSleep(5, 5, stmt);
+
+        rs = stmt.executeQuery(PERSON_TABLE_ASYNC_INDEX_INFO_QUERY);
+        assertFalse(rs.next());
+
+        dropAsyncIndex(stmt);
+        dropTable(stmt);
+    }
+
+    @Test
+    public void testAsyncIndexBuilderDistributed() throws Exception {
+        Map<String,String> props = new HashMap<>();
+        props.put(QueryServices.HBASE_CLUSTER_DISTRIBUTED_ATTRIB, "true");
+        props.put(QueryServices.MAPRED_FRAMEWORK_NAME, "yarn");
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+        
+        Connection conn = DriverManager.getConnection(getUrl());
+        Statement stmt = conn.createStatement();
+        createTableAndLoadData(stmt);
+        createAsyncIndex(stmt);
+
+        ResultSet rs = stmt.executeQuery(PERSON_TABLE_ASYNC_INDEX_INFO_QUERY);
+        assertTrue(rs.next());
+
+        retryWithSleep(5, 5, stmt);
+
+        rs = stmt.executeQuery(PERSON_TABLE_ASYNC_INDEX_INFO_QUERY);
+        assertTrue(rs.next());
+
+        dropAsyncIndex(stmt);
+        dropTable(stmt);
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/04df7bca/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 8d7d8e5..bf396a9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -17,8 +17,12 @@
  */
 package org.apache.phoenix.coprocessor;
 
+import static org.apache.phoenix.query.QueryConstants.ASYNC_INDEX_INFO_QUERY;
+
 import java.io.IOException;
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -30,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -87,10 +92,12 @@ import com.google.common.collect.Lists;
  */
 public class MetaDataRegionObserver extends BaseRegionObserver {
     public static final Log LOG = 
LogFactory.getLog(MetaDataRegionObserver.class);
-    protected ScheduledThreadPoolExecutor executor = new 
ScheduledThreadPoolExecutor(1);
+    protected ScheduledThreadPoolExecutor executor = new 
ScheduledThreadPoolExecutor(2);
     private boolean enableRebuildIndex = 
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD;
     private long rebuildIndexTimeInterval = 
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL;
     private boolean blockWriteRebuildIndex = false;
+    private final String HBASE_CLUSTER_DISTRIBUTED_CONFIG = "true";
+    private final String MAPRED_FRAMEWORK_YARN_CONFIG = "yarn";
 
     @Override
     public void preClose(final ObserverContext<RegionCoprocessorEnvironment> c,
@@ -158,23 +165,109 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
         };
         (new Thread(r)).start();
 
-        if (!enableRebuildIndex && !blockWriteRebuildIndex) {
-            LOG.info("Failure Index Rebuild is skipped by configuration.");
-            return;
-        }
         // turn off verbose deprecation logging
         Logger deprecationLogger = 
Logger.getLogger("org.apache.hadoop.conf.Configuration.deprecation");
         if (deprecationLogger != null) {
             deprecationLogger.setLevel(Level.WARN);
         }
+
         try {
             Class.forName(PhoenixDriver.class.getName());
-            // starts index rebuild schedule work
-            BuildIndexScheduleTask task = new 
BuildIndexScheduleTask(e.getEnvironment());
-            // run scheduled task every 10 secs
-            executor.scheduleAtFixedRate(task, 10000, 
rebuildIndexTimeInterval, TimeUnit.MILLISECONDS);
         } catch (ClassNotFoundException ex) {
-            LOG.error("BuildIndexScheduleTask cannot start!", ex);
+            LOG.error("Phoenix Driver class is not found. Fix the classpath.", 
ex);
+        }
+         
+        Configuration conf = env.getConfiguration();
+        String hbaseClusterDistributedMode = 
conf.get(QueryServices.HBASE_CLUSTER_DISTRIBUTED_ATTRIB);
+        String mapredFrameworkName = 
conf.get(QueryServices.MAPRED_FRAMEWORK_NAME); 
+
+        // In case of non-distributed mode of hbase service or local mode of 
map reduce service, add timer task to rebuild the async indexes  
+        if ((hbaseClusterDistributedMode != null && 
!hbaseClusterDistributedMode.equals(HBASE_CLUSTER_DISTRIBUTED_CONFIG)) || 
+            (mapredFrameworkName != null && 
!mapredFrameworkName.equals(MAPRED_FRAMEWORK_YARN_CONFIG)))
+        {
+            LOG.info("Enabling Async Index rebuilder");
+            AsyncIndexRebuilderTask asyncIndexRebuilderTask = new 
AsyncIndexRebuilderTask(e.getEnvironment());
+            // run async index rebuilder task every 10 secs to rebuild any 
newly created async indexes
+            executor.scheduleAtFixedRate(asyncIndexRebuilderTask, 10000, 
rebuildIndexTimeInterval, TimeUnit.MILLISECONDS);
+        }
+
+        if (!enableRebuildIndex && !blockWriteRebuildIndex) {
+            LOG.info("Failure Index Rebuild is skipped by configuration.");
+            return;
+        }
+
+        // starts index rebuild schedule work
+        BuildIndexScheduleTask task = new 
BuildIndexScheduleTask(e.getEnvironment());
+        // run scheduled task every 10 secs
+        executor.scheduleAtFixedRate(task, 10000, rebuildIndexTimeInterval, 
TimeUnit.MILLISECONDS);
+    }
+    
+    /**
+     * Task runs periodically to re-build async indexes when hbase is running 
in non-distributed mode or 
+     * when mapreduce is running in local mode
+     *
+     */
+    public static class AsyncIndexRebuilderTask extends TimerTask {
+        RegionCoprocessorEnvironment env;
+        PhoenixConnection conn = null;
+
+        public AsyncIndexRebuilderTask(RegionCoprocessorEnvironment env) {
+            this.env = env;
+        }
+
+        @Override
+        public void run() {
+            try {
+                if (conn == null) {
+                   conn = 
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
+                }
+                Statement s = conn.createStatement();
+                ResultSet rs = s.executeQuery(ASYNC_INDEX_INFO_QUERY);
+
+                PhoenixConnection alterIndexConnection = null;
+                while (rs.next()) {
+                    String tableName = 
rs.getString(PhoenixDatabaseMetaData.DATA_TABLE_NAME);
+                    String tableSchema = 
rs.getString(PhoenixDatabaseMetaData.TABLE_SCHEM);
+                    String indexName = 
rs.getString(PhoenixDatabaseMetaData.TABLE_NAME);
+                    String tableNameWithSchema = 
SchemaUtil.getTableName(tableSchema, tableName);
+                    
+                    final PTable pindexTable = PhoenixRuntime.getTable(conn, 
SchemaUtil.getTableName(tableSchema, indexName));
+                    // this is set to ensure index tables remains consistent 
post population.
+                    long maxTimeRange = pindexTable.getTimeStamp()+1;
+
+                    try {
+                        final Properties props = new Properties();
+                        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(maxTimeRange));
+                        alterIndexConnection = 
QueryUtil.getConnectionOnServer(props, 
env.getConfiguration()).unwrap(PhoenixConnection.class);
+
+                        // Alter index query for rebuilding async indexes
+                        String alterIndexQuery = String.format("ALTER INDEX IF 
EXISTS %s ON %s REBUILD", indexName, tableNameWithSchema);
+    
+                        LOG.info("Executing Rebuild Index Query:" + 
alterIndexQuery);
+                        
alterIndexConnection.createStatement().execute(alterIndexQuery);
+                    } catch (Throwable t) {
+                        LOG.error("AsyncIndexRebuilderTask failed during 
rebuilding index!", t);
+                    } finally {
+                        if (alterIndexConnection != null) {
+                            try {
+                                alterIndexConnection.close();
+                            } catch (SQLException ignored) {
+                                LOG.debug("AsyncIndexRebuilderTask can't close 
alterIndexConnection", ignored);
+                            }
+                        }
+                    }
+                }
+            } catch (Throwable t) {
+                LOG.error("AsyncIndexRebuilderTask failed!", t);
+            } finally {
+                if (conn != null) {
+                    try {
+                        conn.close();
+                    } catch (SQLException ignored) {
+                        LOG.debug("AsyncIndexRebuilderTask can't close 
connection", ignored);
+                    }
+                }
+            }
         }
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/04df7bca/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
index 40e03b9..d2dd94f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
@@ -290,7 +290,7 @@ public abstract class PhoenixEmbeddedDriver implements 
Driver, SQLCloseable {
             // Normalize connInfo so that a url explicitly specifying versus 
implicitly inheriting
             // the default values will both share the same 
ConnectionQueryServices.
             if (zookeeperQuorum == null) {
-                zookeeperQuorum = 
props.get(QueryServices.ZOOKEEPER_QUARUM_ATTRIB);
+                zookeeperQuorum = 
props.get(QueryServices.ZOOKEEPER_QUORUM_ATTRIB);
                 if (zookeeperQuorum == null) {
                     throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL)
                     .setMessage(this.toString()).build().buildException();
@@ -357,7 +357,7 @@ public abstract class PhoenixEmbeddedDriver implements 
Driver, SQLCloseable {
         public ReadOnlyProps asProps() {
             Map<String, String> connectionProps = 
Maps.newHashMapWithExpectedSize(3);
             if (getZookeeperQuorum() != null) {
-                connectionProps.put(QueryServices.ZOOKEEPER_QUARUM_ATTRIB, 
getZookeeperQuorum());
+                connectionProps.put(QueryServices.ZOOKEEPER_QUORUM_ATTRIB, 
getZookeeperQuorum());
             }
             if (getPort() != null) {
                 connectionProps.put(QueryServices.ZOOKEEPER_PORT_ATTRIB, 
getPort().toString());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/04df7bca/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java
index 9b8d5c8..99f84ec 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.mapreduce.index.automation;
 
+import static org.apache.phoenix.query.QueryConstants.ASYNC_INDEX_INFO_QUERY;
+
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -67,15 +69,6 @@ public class PhoenixMRJobSubmitter {
             "/phoenix/automated-mr-index-build-leader-election";
     private static final String AUTO_INDEX_BUILD_LOCK_NAME = 
"ActiveStandbyElectorLock";
 
-    private static final String CANDIDATE_INDEX_INFO_QUERY = "SELECT "
-            + PhoenixDatabaseMetaData.INDEX_TYPE + ", " + 
PhoenixDatabaseMetaData.DATA_TABLE_NAME
-            + ", " + PhoenixDatabaseMetaData.TABLE_SCHEM + ", "
-            + PhoenixDatabaseMetaData.TABLE_NAME + " FROM "
-            + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE "
-            + PhoenixDatabaseMetaData.COLUMN_NAME + " is null and "
-            + PhoenixDatabaseMetaData.TABLE_TYPE + " = '" + 
PTableType.INDEX.getSerializedValue()
-            + "' and " + PhoenixDatabaseMetaData.INDEX_STATE + " = '"
-            + PIndexState.BUILDING.getSerializedValue() + "'";
     // TODO - Move this to a property?
     private static final int JOB_SUBMIT_POOL_TIMEOUT = 5;
     private Configuration conf;
@@ -151,7 +144,7 @@ public class PhoenixMRJobSubmitter {
     public Map<String, PhoenixAsyncIndex> getCandidateJobs() throws 
SQLException {
         Connection con = DriverManager.getConnection("jdbc:phoenix:" + 
zkQuorum);
         Statement s = con.createStatement();
-        ResultSet rs = s.executeQuery(CANDIDATE_INDEX_INFO_QUERY);
+        ResultSet rs = s.executeQuery(ASYNC_INDEX_INFO_QUERY);
         Map<String, PhoenixAsyncIndex> candidateIndexes = new HashMap<String, 
PhoenixAsyncIndex>();
         while (rs.next()) {
             PhoenixAsyncIndex indexInfo = new PhoenixAsyncIndex();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/04df7bca/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 9f8f58c..8bc1c5b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -21,6 +21,7 @@ package org.apache.phoenix.query;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARG_POSITION;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ASYNC_CREATED_DATE;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BASE_COLUMN_COUNT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BUFFER_LENGTH;
@@ -115,9 +116,12 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.schema.MetaDataSplitPolicy;
+import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
+import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PDate;
 import org.apache.phoenix.util.ByteUtil;
 
 
@@ -167,7 +171,19 @@ public interface QueryConstants {
 
     public static final byte[] TRUE = new byte[] {1};
     
-
+    public static final String ASYNC_INDEX_INFO_QUERY = "SELECT "
+            + DATA_TABLE_NAME + ", " + TABLE_SCHEM + ", "
+            + TABLE_NAME + ", " + ASYNC_CREATED_DATE 
+            + " FROM " + SYSTEM_CATALOG_SCHEMA + ".\""
+            + SYSTEM_CATALOG_TABLE + "\""
+            + " (" + ASYNC_CREATED_DATE + " "
+            + PDate.INSTANCE.getSqlTypeName() + ") " + " WHERE "
+            + COLUMN_NAME + " IS NULL and " + COLUMN_FAMILY + " IS NULL  and "
+            + ASYNC_CREATED_DATE + " IS NOT NULL and "
+            + TABLE_TYPE + " = '" + PTableType.INDEX.getSerializedValue()
+            + "' and " + PhoenixDatabaseMetaData.INDEX_STATE + " = '"
+            + PIndexState.BUILDING.getSerializedValue() + "'";
+    
     /**
      * Separator used between variable length keys for a composite key.
      * Variable length data types may not use this byte value.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/04df7bca/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 6df3d1e..3f3518b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -107,7 +107,7 @@ public interface QueryServices extends SQLCloseable {
     public static final String REGIONSERVER_LEASE_PERIOD_ATTRIB = 
"hbase.regionserver.lease.period";
     public static final String RPC_TIMEOUT_ATTRIB = "hbase.rpc.timeout";
     public static final String DYNAMIC_JARS_DIR_KEY = "hbase.dynamic.jars.dir";
-    public static final String ZOOKEEPER_QUARUM_ATTRIB = 
"hbase.zookeeper.quorum";
+    public static final String ZOOKEEPER_QUORUM_ATTRIB = 
"hbase.zookeeper.quorum";
     public static final String ZOOKEEPER_PORT_ATTRIB = 
"hbase.zookeeper.property.clientPort";
     public static final String ZOOKEEPER_ROOT_NODE_ATTRIB = 
"zookeeper.znode.parent";
     public static final String DISTINCT_VALUE_COMPRESS_THRESHOLD_ATTRIB = 
"phoenix.distinct.value.compress.threshold";
@@ -116,6 +116,8 @@ public interface QueryServices extends SQLCloseable {
     public static final String 
MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB = 
"phoenix.coprocessor.maxMetaDataCacheTimeToLiveMs";
     public static final String MAX_SERVER_METADATA_CACHE_SIZE_ATTRIB = 
"phoenix.coprocessor.maxMetaDataCacheSize";
     public static final String MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB = 
"phoenix.client.maxMetaDataCacheSize";
+    public static final String HBASE_CLUSTER_DISTRIBUTED_ATTRIB = 
"hbase.cluster.distributed";
+    public static final String MAPRED_FRAMEWORK_NAME = 
"mapreduce.framework.name";
 
     public static final String AUTO_UPGRADE_WHITELIST_ATTRIB = 
"phoenix.client.autoUpgradeWhiteList";
     // Mainly for testing to force spilling

Reply via email to